Spaces:
Runtime error
Runtime error
File size: 7,359 Bytes
ca961b4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 | """
Auto-Save Utilities
Provides decorators and context managers for automatic session saving
after long-running operations like job submissions.
"""
import functools
import threading
import time
from typing import Callable, Optional, Any, Dict
from datetime import datetime, timedelta
from session_integration import SessionIntegration
class AutoSaveManager:
"""
Manages periodic auto-saving of session state.
"""
def __init__(
self,
session_integration: SessionIntegration,
trame_state: Any,
interval_seconds: int = 30
):
"""
Initialize auto-save manager.
Args:
session_integration: SessionIntegration instance
trame_state: Trame server state
interval_seconds: Interval between auto-saves
"""
self.session_integration = session_integration
self.trame_state = trame_state
self.interval = interval_seconds
self.thread: Optional[threading.Thread] = None
self.running = False
self.last_save = datetime.utcnow()
def start(self) -> None:
"""Start the auto-save thread."""
if self.running:
return
self.running = True
self.thread = threading.Thread(target=self._auto_save_loop, daemon=True)
self.thread.name = "SessionAutoSaveThread"
self.thread.start()
def stop(self) -> None:
"""Stop the auto-save thread."""
self.running = False
if self.thread:
self.thread.join(timeout=5)
def _auto_save_loop(self) -> None:
"""Main loop for auto-save thread."""
while self.running:
try:
time.sleep(self.interval)
if self.session_integration.auto_save_enabled:
self.session_integration._capture_trame_state(self.trame_state)
self.session_integration.save_current_session()
self.last_save = datetime.utcnow()
except Exception as e:
print(f"Auto-save error: {e}")
def manual_save(self) -> bool:
"""Manually trigger a save."""
if not self.session_integration.auto_save_enabled:
return False
self.session_integration._capture_trame_state(self.trame_state)
result = self.session_integration.save_current_session()
if result:
self.last_save = datetime.utcnow()
return result
def auto_save_after_operation(
session_integration: SessionIntegration,
trame_state: Any
) -> Callable:
"""
Decorator that automatically saves session after a function completes.
Usage:
@auto_save_after_operation(session_integration, state)
def run_long_job():
# Do work
pass
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
finally:
if session_integration.auto_save_enabled:
session_integration._capture_trame_state(trame_state)
session_integration.save_current_session()
return result
return wrapper
return decorator
def save_on_job_submit(
session_integration: SessionIntegration,
trame_state: Any,
job_id_key: str = "job_id",
service_type: str = "unknown"
) -> Callable:
"""
Decorator that tracks job submission and saves session.
Usage:
@save_on_job_submit(session_integration, state, job_id_key="job_id", service_type="qiskit_ibm")
def submit_to_ibm():
job_id = "..."
return job_id
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
# Extract job ID from result
job_id = None
if isinstance(result, str):
job_id = result
elif isinstance(result, dict) and job_id_key in result:
job_id = result[job_id_key]
# Track job in session
if job_id:
session_integration.add_job(job_id, service_type)
print(f"Job {job_id} tracked in session {session_integration.current_session_id}")
# Save session
if session_integration.auto_save_enabled:
session_integration._capture_trame_state(trame_state)
session_integration.save_current_session()
return result
return wrapper
return decorator
class JobProgressTracker:
"""
Tracks progress of long-running jobs and periodically saves session.
"""
def __init__(
self,
session_integration: SessionIntegration,
trame_state: Any,
job_id: str,
service_type: str = "unknown",
save_interval: int = 10
):
"""
Initialize job progress tracker.
Args:
session_integration: SessionIntegration instance
trame_state: Trame server state
job_id: Job ID to track
service_type: Service type (e.g., "qiskit_ibm")
save_interval: Seconds between progress saves
"""
self.session_integration = session_integration
self.trame_state = trame_state
self.job_id = job_id
self.service_type = service_type
self.save_interval = save_interval
self.last_save = datetime.utcnow()
# Register job in session
session_integration.add_job(job_id, service_type)
def update_progress(self, progress: float, status: str = "running") -> None:
"""
Update job progress.
Args:
progress: Progress 0.0-1.0
status: Job status string
"""
# Check if it's time to save
now = datetime.utcnow()
if now - self.last_save > timedelta(seconds=self.save_interval):
if self.session_integration.auto_save_enabled:
self.session_integration.update_job_status(self.job_id, status)
self.session_integration._capture_trame_state(self.trame_state)
self.session_integration.save_current_session()
self.last_save = now
def complete(self, result: Optional[Dict[str, Any]] = None) -> None:
"""
Mark job as complete.
Args:
result: Optional result data
"""
self.session_integration.update_job_status(
self.job_id,
"completed",
result
)
self.session_integration._capture_trame_state(self.trame_state)
self.session_integration.save_current_session()
def fail(self, error: str = "Unknown error") -> None:
"""
Mark job as failed.
Args:
error: Error message
"""
self.session_integration.update_job_status(
self.job_id,
"failed",
{"error": error}
)
self.session_integration._capture_trame_state(self.trame_state)
self.session_integration.save_current_session()
|