""" EM Embedded - Simulation Module Contains simulation logic including run_simulation_only, reset_to_defaults, and stop handlers. """ import re import asyncio import threading import time import numpy as np from .state import state, ctrl, _apply_workflow_highlights, is_statevector_estimator_selected, is_ibm_qpu_selected from .globals import ( plotter, simulation_data, current_mesh, snapshot_times, stop_simulation, qpu_ts_cache, sim_ts_cache, set_stop_simulation, reset_globals, ) # Import backend functions try: from quantum.utils.delta_impulse_generator import ( create_impulse_state, create_gaussian_state, create_impulse_state_from_pos, create_gaussian_state_from_pos, run_sim, create_time_frames ) import quantum.utils.delta_impulse_generator as qutils except ModuleNotFoundError: from utils.delta_impulse_generator import ( create_impulse_state, create_gaussian_state, create_impulse_state_from_pos, create_gaussian_state_from_pos, run_sim, create_time_frames ) import utils.delta_impulse_generator as qutils # --- Module-level async infrastructure --- _heartbeat_thread = None _heartbeat_on = False _sim_start_time = None _simulation_executor = None # Thread pool for async execution _main_loop = None # Reference to main event loop for thread-safe callbacks def _get_server(): """Get the trame server from state module.""" from .state import get_server return get_server() def _flush_state(): """Force state flush to browser (synchronous, for main thread use).""" try: server = _get_server() if server: server.state.flush() except Exception: pass def _flush_state_threadsafe(): """ Thread-safe state flush - schedules flush on the main event loop. Use this from background threads (e.g., inside executor callbacks). """ global _main_loop try: server = _get_server() if server and _main_loop is not None and _main_loop.is_running(): # Schedule the flush on the main event loop _main_loop.call_soon_threadsafe(server.state.flush) elif server: # Fallback: direct flush (may not work from threads) server.state.flush() except Exception: pass async def _flush_async(): """Async helper to flush state and yield to event loop.""" _flush_state() await asyncio.sleep(0) # Yield control to event loop def _start_progress_heartbeat(): """Start background thread for continuous progress updates.""" global _heartbeat_thread, _heartbeat_on, _sim_start_time if _heartbeat_thread and _heartbeat_thread.is_alive(): return _sim_start_time = time.time() def loop_fn(): global _heartbeat_on while _heartbeat_on: if state.is_running and _sim_start_time is not None: elapsed = time.time() - _sim_start_time state.simulation_elapsed = elapsed _flush_state_threadsafe() # Use thread-safe version time.sleep(0.1) # Update every 100ms _heartbeat_on = True _heartbeat_thread = threading.Thread(target=loop_fn, daemon=True) _heartbeat_thread.start() def _stop_progress_heartbeat(): """Stop the background heartbeat thread.""" global _heartbeat_on, _heartbeat_thread _heartbeat_on = False _heartbeat_thread = None def _auto_hide_status_window(delay_seconds=3.0): """ Schedule the status window to auto-hide after a delay. Shows the completion message briefly then closes automatically. """ def _hide_after_delay(): time.sleep(delay_seconds) state.status_visible = False _flush_state_threadsafe() hide_thread = threading.Thread(target=_hide_after_delay, daemon=True) hide_thread.start() __all__ = [ "run_simulation_only", "reset_to_defaults", "stop_simulation_handler", "log_to_console", "log_message", "setup_surface_plot_data", "generate_plot", "redraw_surface_plot", "update_sim_monitor_points", "add_dotted_unit_grid", "add_dotted_unit_grid_scaled", "build_sim_timeseries_plotly", "update_value_display", ] def update_sim_monitor_points(): """Update simulator monitor points based on timeseries_points input.""" from .utils import snap_samples_to_grid sample_value = state.timeseries_points if not sample_value or not str(sample_value).strip(): state.timeseries_gridpoints = "" state.timeseries_point_info = "" return nx_val = state.nx if nx_val is None: state.timeseries_gridpoints = "" state.timeseries_point_info = "Select a grid size (nx) to compute the nearest monitor positions." return snapped, message = snap_samples_to_grid(sample_value, int(nx_val)) state.timeseries_gridpoints = snapped state.timeseries_point_info = message or "" def log_message(message, level="INFO"): """Log a message to the console.""" from datetime import datetime timestamp = datetime.now().strftime("%H:%M:%S") log_line = f"[{timestamp}] [{level}] {message}\n" current = state.console_logs or "" state.console_logs = current + log_line def log_to_console(message): """Log a message to the console output.""" current = state.console_output or "" state.console_output = current + message + "\n" def setup_surface_plot_data(sim_data, nx): """Setup surface plot data from simulation results - matches em_embedded.py exactly.""" from . import globals as g nx = int(nx) mask = np.arange(1, nx * nx + 1) % nx != 0 g.data_frames = {'Ez': [], 'Hx': [], 'Hy': []} g.surface_clims = {'Ez': [np.inf, -np.inf], 'Hx': [np.inf, -np.inf], 'Hy': [np.inf, -np.inf]} for u in sim_data: ez = u[:nx*nx].reshape(nx, nx) hx = u[2*nx*nx:3*nx*nx-nx].reshape(nx-1, nx) hy = u[-nx*nx:][mask].reshape(nx, nx-1) g.data_frames['Ez'].append(ez) g.data_frames['Hx'].append(hx) g.data_frames['Hy'].append(hy) if ez.size > 0: g.surface_clims['Ez'][0] = min(g.surface_clims['Ez'][0], ez.min()) g.surface_clims['Ez'][1] = max(g.surface_clims['Ez'][1], ez.max()) if hx.size > 0: g.surface_clims['Hx'][0] = min(g.surface_clims['Hx'][0], hx.min()) g.surface_clims['Hx'][1] = max(g.surface_clims['Hx'][1], hx.max()) if hy.size > 0: g.surface_clims['Hy'][0] = min(g.surface_clims['Hy'][0], hy.min()) g.surface_clims['Hy'][1] = max(g.surface_clims['Hy'][1], hy.max()) # Prevent zero-range clims for key in g.surface_clims: if g.surface_clims[key][0] == g.surface_clims[key][1]: g.surface_clims[key][0] -= 1e-9 g.surface_clims[key][1] += 1e-9 # Use integer grid coordinates (like em_embedded.py / app.py) x = np.arange(nx) y = np.arange(nx) x_m1 = np.arange(nx - 1) y_m1 = np.arange(nx - 1) g.X_grids['Ez'], g.Y_grids['Ez'] = np.meshgrid(x, y) g.X_grids['Hx'], g.Y_grids['Hx'] = np.meshgrid(x, y_m1) g.X_grids['Hy'], g.Y_grids['Hy'] = np.meshgrid(x_m1, y) # Compute z_scale for visualization finite_vals = [abs(float(v)) for pair in g.surface_clims.values() for v in pair if np.isfinite(v)] max_abs = max(finite_vals) if finite_vals else 1e-9 g.z_scale = (nx / 2) / max(max_abs, 1e-9) g.simulation_data = sim_data def generate_plot(): """Generate the plot based on output_type selection.""" import re from . import globals as g if not state.simulation_has_run: return plotter.clear() try: plotter.disable_picking() except Exception: pass nx = int(state.nx) if state.output_type == "Surface Plot": redraw_surface_plot() else: # Time Series -> Plotly for Simulator try: points_str = state.timeseries_gridpoints or "" positions = [tuple(map(int, match)) for match in re.findall(r'\((\d+)\s*,\s*(\d+)\)', points_str)] if not positions and (state.timeseries_points or "").strip(): raise ValueError("No valid monitor positions found. Enter (x, y) pairs in [0,1] x [0,1].") fig = build_sim_timeseries_plotly(state.timeseries_field, positions, nx, g.snapshot_times, g.simulation_data) if fig is not None: # Cache the figure for export g.sim_ts_cache["fig"] = fig g.sim_ts_cache["field"] = state.timeseries_field try: ctrl.sim_ts_update(fig) except Exception: pass except Exception as e: state.error_message = f"Plotting Error: {e}" ctrl.view_update() def redraw_surface_plot(): """Redraw the surface plot with current field and time - matches em_embedded.py.""" import pyvista as pv from . import globals as g plotter.clear() field = state.surface_field if g.data_frames is None or not g.data_frames.get(field): return if g.snapshot_times is None or len(g.snapshot_times) == 0: return # Find nearest snapshot index to requested time and clamp to available frames req_t = float(state.time_val) times = np.asarray(g.snapshot_times) idx = int(np.argmin(np.abs(times - req_t))) max_idx = len(g.data_frames[field]) - 1 idx = max(0, min(idx, max_idx)) z_data = g.data_frames[field][idx] X = g.X_grids[field] Y = g.Y_grids[field] points = np.c_[X.ravel(), Y.ravel(), z_data.ravel() * g.z_scale] poly = pv.PolyData(points) mesh = poly.delaunay_2d() mesh['scalars'] = z_data.ravel() g.current_mesh = mesh # Add mesh with styling matching em_embedded.py plotter.add_mesh( mesh, scalars='scalars', # clim=g.surface_clims[field], cmap="turbo", show_scalar_bar=False, show_edges=True, edge_color='grey', line_width=0.5 ) plotter.add_scalar_bar(title=f"{field} Amplitude") # Enable point picking try: plotter.disable_picking() except Exception: pass plotter.enable_point_picking(callback=update_value_display, show_message=False) plotter.add_axes() plotter.view_isometric() try: plotter.camera.parallel_projection = True except Exception: pass ctrl.view_update() # --------------------------------------------------------------------------- # Async Simulation Runner with Full Async Pattern # --------------------------------------------------------------------------- def run_simulation_only(): """ Entry point for simulation - launches the async worker. This is called by the UI button click and schedules the async task. """ server = _get_server() if server is None: log_to_console("Error: Server not available") return # Schedule the async simulation asyncio.ensure_future(_run_simulation_async()) async def _run_simulation_async(): """ Async simulation runner that uses thread pool for blocking work. This allows the UI to update in real-time during simulation. """ global _main_loop from . import globals as g from .excitation import nearest_node_index from .qpu import build_qpu_timeseries_plotly_multi from concurrent.futures import ThreadPoolExecutor # Capture the main event loop for thread-safe callbacks _main_loop = asyncio.get_event_loop() # Create executor for blocking operations executor = ThreadPoolExecutor(max_workers=1) loop = _main_loop # Require selections before running if not state.geometry_selection: state.error_message = "Please select a geometry before running the simulation." log_to_console("Error: Please select a geometry before running.") state.status_visible = True state.status_message = "Error: Please select a geometry before running." state.status_type = "error" state.show_progress = False state.is_running = False state.run_button_text = "RUN!" await _flush_async() return if not state.dist_type: state.error_message = "Please select an initial state before running the simulation." log_to_console("Error: Please select an initial state before running.") state.status_visible = True state.status_message = "Error: Please select an initial state before running." state.status_type = "error" state.show_progress = False state.is_running = False state.run_button_text = "RUN!" await _flush_async() return # Show status: Starting simulation state.status_visible = True state.status_message = "Initializing simulation..." log_to_console("Initializing simulation...") state.status_type = "info" state.show_progress = True state.simulation_progress = 0 await _flush_async() # Start heartbeat for continuous elapsed time updates _start_progress_heartbeat() # Progress callback that updates state (called from worker thread) # Uses thread-safe flush to push updates to browser last_logged_percent = [0] def _progress_callback(percent): state.simulation_progress = percent if percent - last_logged_percent[0] >= 10: log_to_console(f"Simulation progress: {int(percent)}%") last_logged_percent[0] = percent _flush_state_threadsafe() # Thread-safe flush! # Reset stop flag and enable Stop button at start set_stop_simulation(False) state.stop_button_disabled = False plotter.clear() g.current_mesh = None state.error_message = "" state.is_running = True state.simulation_has_run = False state.run_button_text = "Running" # Initial flush to show "Running" state _flush_state() nx, T = int(state.nx), float(state.T) na, R = 1, 4 try: state.status_message = "Creating initial state..." state.simulation_progress = 10 _flush_state() if state.dist_type == "Delta": initial_state = create_impulse_state_from_pos( (nx, nx), (float(state.impulse_x), float(state.impulse_y)), snap_to_grid=True, ) else: initial_state = create_gaussian_state_from_pos( (nx, nx), (float(state.mu_x), float(state.mu_y)), (float(state.sigma_x), float(state.sigma_y)), snap_to_grid=True, ) except ValueError as e: state.error_message = f"Initial State Error: {e}" state.status_message = f"Error: {e}" state.status_type = "error" state.show_progress = False state.is_running = False state.run_button_text = "RUN!" state.stop_button_disabled = True _stop_progress_heartbeat() await _flush_async() executor.shutdown(wait=False) return sve_selected = is_statevector_estimator_selected() # If Statevector Estimator selected, build time series chart and return if sve_selected: try: log_to_console("Running Statevector Estimator...") state.status_message = "Step 1: Initializing Statevector Estimator..." state.simulation_progress = 5 await _flush_async() state.qpu_ts_ready = False state.qpu_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;" state.qpu_ts_other_ready = False state.qpu_other_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;" # Inputs for QPU snapshot_dt = float(state.dt_user) ix_imp, iy_imp = nearest_node_index(float(state.impulse_x), float(state.impulse_y), nx) impulse_pos = (ix_imp, iy_imp) # Build configs from primitive slots configs = [{ "field": (state.qpu_field_components or "Ez"), "points": (state.qpu_monitor_gridpoints or ""), }] try: cnt = int(state.qpu_monitor_count or 0) except Exception: cnt = 0 for slot_num in range(2, 2 + cnt): f = getattr(state, f"qpu_field_components_{slot_num}", "Ez") or "Ez" p = getattr(state, f"qpu_monitor_gridpoints_{slot_num}", "") or "" configs.append({"field": f, "points": p}) state.status_message = "Step 1: Setting up Statevector Estimator..." state.simulation_progress = 10 await _flush_async() # SVE-specific progress callback that maps internal 0-100% to 10-90% range # and shows appropriate step messages def _sve_progress_callback(pct): # Map internal progress (0-100%) to range 10-90% mapped_pct = 10 + (pct * 0.8) # 10% to 90% state.simulation_progress = int(mapped_pct) if mapped_pct < 30: state.status_message = f"Step 2: Building quantum circuits ({int(mapped_pct)}%)" elif mapped_pct < 70: state.status_message = f"Step 3: Running Statevector simulation ({int(mapped_pct)}%)" else: state.status_message = f"Step 4: Processing results ({int(mapped_pct)}%)" _flush_state_threadsafe() def _sve_series_runner(field_type, positions, total_time, snapshot_dt, nx, impulse_pos, progress_callback=None, print_callback=None): return qutils.run_sve( field_type, positions, None, total_time, snapshot_dt, nx, None, impulse_pos, progress_callback=progress_callback, print_callback=print_callback, ) # Run SVE in executor to keep UI responsive def _run_sve_blocking(): return build_qpu_timeseries_plotly_multi( configs, nx, T, snapshot_dt, impulse_pos, series_runner=_sve_series_runner, progress_callback=_sve_progress_callback, print_callback=log_to_console ) fig = await loop.run_in_executor(executor, _run_sve_blocking) qpu_ts_cache["fig"] = fig # Step 5: Creating plots (90-100%) state.simulation_progress = 95 state.status_message = "Step 5: Creating plots (95%)" _flush_state() try: ctrl.qpu_ts_update(fig) except Exception: pass state.simulation_has_run = True state.run_button_text = "Successful!" state.simulation_progress = 100 state.status_message = "Statevector Estimator simulation completed successfully!" log_to_console("Statevector Estimator run completed") state.status_type = "success" state.show_progress = False _auto_hide_status_window(3.0) # Auto-hide after 3 seconds ready = bool(getattr(fig, "data", None)) and len(fig.data) > 0 state.qpu_ts_ready = ready state.qpu_plot_style = ( "width: 900px; height: 660px; margin: 0 auto;" if ready else "display: none; width: 900px; height: 660px; margin: 0 auto;" ) state.qpu_ts_other_ready = False state.qpu_other_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;" if not ready: state.error_message = "No Statevector Estimator time series generated. Check Δt, T, nx, and monitor points." state.status_message = "Warning: No Statevector Estimator time series generated." state.status_type = "warning" log_to_console("Statevector Estimator complete.") except Exception as e: state.error_message = f"Statevector Estimator run failed: {e}" state.status_message = f"Statevector Estimator Error: {e}" state.status_type = "error" state.show_progress = False state.run_button_text = "RUN!" state.qpu_ts_ready = False log_to_console(f"Statevector Estimator error: {e}") finally: state.is_running = False state.stop_button_disabled = True _stop_progress_heartbeat() await _flush_async() executor.shutdown(wait=False) return # IBM QPU branch ibm_qpu_selected = is_ibm_qpu_selected() if ibm_qpu_selected: try: log_to_console("Running IBM QPU simulation...") state.status_message = "Running IBM QPU simulation..." state.simulation_progress = 5 await _flush_async() # Import IBM QPU backend try: from quantum.utils.EBU_Quantum.no_body.base_functions import get_field_values as ibm_get_field_values, create_time_frames as ibm_create_time_frames except ModuleNotFoundError: from utils.EBU_Quantum.no_body.base_functions import get_field_values as ibm_get_field_values, create_time_frames as ibm_create_time_frames # Inputs for IBM QPU (single field, single position only!) snapshot_dt = float(state.dt_user) ix_imp, iy_imp = nearest_node_index(float(state.impulse_x), float(state.impulse_y), nx) impulse_pos = (ix_imp, iy_imp) # Get field and single position from UI # IBM QPU only supports one field and one position! field_type = (state.qpu_field_components or "Ez").strip() if field_type == "All": field_type = "Ez" # Default to Ez if 'All' selected (not supported by IBM QPU) log_to_console("Warning: IBM QPU only supports single field. Defaulting to Ez.") # Parse single monitor position pts_str = str(state.qpu_monitor_gridpoints or "").strip() raw_pts = [tuple(map(int, m)) for m in re.findall(r"\((\d+)\s*,\s*(\d+)\)", pts_str)] if not raw_pts: # Default to impulse position monitor_x, monitor_y = impulse_pos log_to_console(f"No monitor position specified. Using impulse position ({monitor_x}, {monitor_y}).") else: # Use only the first position (IBM QPU restriction) monitor_x, monitor_y = raw_pts[0] if len(raw_pts) > 1: log_to_console(f"Warning: IBM QPU only supports single position. Using first: ({monitor_x}, {monitor_y})") state.status_message = "Step 1: Generating circuit..." state.simulation_progress = 0 await _flush_async() def _ibm_progress_callback(pct, message=None): """ Progress callback for IBM QPU with 4-step pattern: Step 1: Generating circuit (0-10%) Step 2: Optimising Circuit (10-60%) Step 3: Job Submitted + Status monitoring (60-90%) Step 4: Creating Plots (90-100%) """ state.simulation_progress = int(pct) if message: state.status_message = message elif pct < 10: state.status_message = f"Step 1: Generating circuit ({int(pct)}%)" elif pct < 60: # Map 10-40% internal to 10-60% display state.status_message = f"Step 2: Optimising circuit ({int(pct)}%)" elif pct < 90: state.status_message = f"Step 3: Job execution ({int(pct)}%)" else: state.status_message = f"Step 4: Creating plots ({int(pct)}%)" _flush_state_threadsafe() # Thread-safe flush from callback thread # Call the IBM QPU get_field_values function in executor to keep UI responsive def _run_ibm_qpu(): return ibm_get_field_values( field=field_type, x=monitor_x, y=monitor_y, T=float(T), snapshot_time=snapshot_dt, nx=nx, impulse_pos=impulse_pos, shots=10000, pm_optimization_level=2, simulation="False", optimization="True", platform="IBM", progress_callback=_ibm_progress_callback, print_callback=log_to_console, ) field_values = await loop.run_in_executor(executor, _run_ibm_qpu) # Build time frames to match the output times = ibm_create_time_frames(float(T), snapshot_dt) # Build Plotly figure for the single time series import plotly.graph_objects as go fig = go.Figure() # Determine grid dimensions for label if field_type == 'Ez': gw, gh = nx, nx elif field_type == 'Hx': gw, gh = nx, nx - 1 else: gw, gh = nx - 1, nx from .utils import normalized_position_label label = normalized_position_label(monitor_x, monitor_y, gw, gh) # Color based on field type if field_type == 'Ez': color = "#d32f2f" # Red elif field_type == 'Hx': color = "#388e3c" # Green else: color = "#1976d2" # Blue fig.add_trace( go.Scatter( x=list(times), y=[float(v) for v in field_values], mode='lines+markers', name=f"{field_type} @ {label}", line=dict(color=color, width=2.5), marker=dict(size=7, symbol="circle", color=color), hovertemplate=f"{field_type} | t=%{{x:.3f}}s
Value=%{{y:.6g}}{label}", ) ) max_abs = max((abs(float(v)) for v in field_values), default=1.0) pad = 0.12 * max_abs if max_abs > 0 else 0.1 fig.update_layout( title=f"IBM QPU Time Series - {field_type} @ {label}", height=660, width=900, margin=dict(l=50, r=30, t=50, b=50), hovermode="x unified", legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1, title_text=""), paper_bgcolor="#FFFFFF", plot_bgcolor="#FFFFFF", ) fig.update_xaxes(title_text="Time (s)", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)") fig.update_yaxes(title_text="Field Value", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)") fig.update_yaxes(range=[-max_abs - pad, max_abs + pad]) # Cache the figure for export qpu_ts_cache["fig"] = fig qpu_ts_cache["times"] = list(times) qpu_ts_cache["series_map"] = {(field_type, monitor_x, monitor_y): list(field_values)} qpu_ts_cache["field"] = field_type qpu_ts_cache["unique_fields"] = [field_type] try: ctrl.qpu_ts_update(fig) except Exception: pass state.simulation_has_run = True state.run_button_text = "Successful!" state.simulation_progress = 100 state.status_message = "IBM QPU simulation completed successfully!" log_to_console("IBM QPU run completed") state.status_type = "success" state.show_progress = False _auto_hide_status_window(3.0) # Auto-hide after 3 seconds await _flush_async() # Update UI with completion status ready = bool(field_values) and len(field_values) > 0 state.qpu_ts_ready = ready state.qpu_plot_style = ( "width: 900px; height: 660px; margin: 0 auto;" if ready else "display: none; width: 900px; height: 660px; margin: 0 auto;" ) state.qpu_ts_other_ready = False state.qpu_other_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;" # Set filter options for single result state.qpu_plot_field_options = ["All", field_type] state.qpu_plot_filter = "All" state.qpu_plot_position_options = ["All positions", label] state.qpu_plot_position_filter = "All positions" if not ready: state.error_message = "No IBM QPU time series generated. Check Δt, T, nx, and monitor position." state.status_message = "Warning: No IBM QPU time series generated." state.status_type = "warning" log_to_console("IBM QPU complete.") except Exception as e: import traceback state.error_message = f"IBM QPU run failed: {e}" state.status_message = f"IBM QPU Error: {e}" state.status_type = "error" state.show_progress = False state.run_button_text = "RUN!" state.qpu_ts_ready = False log_to_console(f"IBM QPU error: {e}") log_to_console(traceback.format_exc()) finally: state.is_running = False state.stop_button_disabled = True _stop_progress_heartbeat() executor.shutdown(wait=False) await _flush_async() return # IonQ QPU branch ionq_qpu_selected = state.backend_type == "QPU" and state.selected_qpu == "IonQ QPU" if ionq_qpu_selected: try: log_to_console("Running IonQ QPU simulation...") state.status_message = "Running IonQ QPU simulation..." state.simulation_progress = 5 await _flush_async() # Import IonQ QPU backend (same module as IBM, different platform param) try: from quantum.utils.EBU_Quantum.no_body.base_functions import get_field_values as ionq_get_field_values, create_time_frames as ionq_create_time_frames except ModuleNotFoundError: from utils.EBU_Quantum.no_body.base_functions import get_field_values as ionq_get_field_values, create_time_frames as ionq_create_time_frames # Inputs for IonQ QPU (single field, single position only!) snapshot_dt = float(state.dt_user) ix_imp, iy_imp = nearest_node_index(float(state.impulse_x), float(state.impulse_y), nx) impulse_pos = (ix_imp, iy_imp) # Get field and single position from UI field_type = (state.qpu_field_components or "Ez").strip() if field_type == "All": field_type = "Ez" log_to_console("Warning: IonQ QPU only supports single field. Defaulting to Ez.") # Parse single monitor position pts_str = str(state.qpu_monitor_gridpoints or "").strip() raw_pts = [tuple(map(int, m)) for m in re.findall(r"\((\d+)\s*,\s*(\d+)\)", pts_str)] if not raw_pts: monitor_x, monitor_y = impulse_pos log_to_console(f"No monitor position specified. Using impulse position ({monitor_x}, {monitor_y}).") else: monitor_x, monitor_y = raw_pts[0] if len(raw_pts) > 1: log_to_console(f"Warning: IonQ QPU only supports single position. Using first: ({monitor_x}, {monitor_y})") state.status_message = "Step 1: Generating circuit..." state.simulation_progress = 0 await _flush_async() def _ionq_progress_callback(pct, message=None): """Progress callback for IonQ QPU.""" state.simulation_progress = int(pct) if message: state.status_message = message elif pct < 10: state.status_message = f"Step 1: Generating circuit ({int(pct)}%)" elif pct < 60: state.status_message = f"Step 2: Optimising circuit ({int(pct)}%)" elif pct < 90: state.status_message = f"Step 3: Job execution ({int(pct)}%)" else: state.status_message = f"Step 4: Creating plots ({int(pct)}%)" _flush_state_threadsafe() # Call the IonQ QPU get_field_values function in executor def _run_ionq_qpu(): return ionq_get_field_values( field=field_type, x=monitor_x, y=monitor_y, T=float(T), snapshot_time=snapshot_dt, nx=nx, impulse_pos=impulse_pos, shots=10000, pm_optimization_level=1, # IonQ recommended simulation="False", optimization="True", platform="IONQ", # <-- Key difference from IBM progress_callback=_ionq_progress_callback, print_callback=log_to_console, ) field_values = await loop.run_in_executor(executor, _run_ionq_qpu) # Build time frames to match the output times = ionq_create_time_frames(float(T), snapshot_dt) # Build Plotly figure for the single time series import plotly.graph_objects as go fig = go.Figure() # Determine grid dimensions for label if field_type == 'Ez': gw, gh = nx, nx elif field_type == 'Hx': gw, gh = nx, nx - 1 else: gw, gh = nx - 1, nx from .utils import normalized_position_label label = normalized_position_label(monitor_x, monitor_y, gw, gh) # Color based on field type if field_type == 'Ez': color = "#d32f2f" elif field_type == 'Hx': color = "#388e3c" else: color = "#1976d2" fig.add_trace( go.Scatter( x=list(times), y=[float(v) for v in field_values], mode='lines+markers', name=f"{field_type} @ {label}", line=dict(color=color, width=2.5), marker=dict(size=7, symbol="circle", color=color), hovertemplate=f"{field_type} | t=%{{x:.3f}}s
Value=%{{y:.6g}}{label}", ) ) max_abs = max((abs(float(v)) for v in field_values), default=1.0) pad = 0.12 * max_abs if max_abs > 0 else 0.1 fig.update_layout( title=f"IonQ QPU Time Series - {field_type} @ {label}", height=660, width=900, margin=dict(l=50, r=30, t=50, b=50), hovermode="x unified", legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1, title_text=""), paper_bgcolor="#FFFFFF", plot_bgcolor="#FFFFFF", ) fig.update_xaxes(title_text="Time (s)", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)") fig.update_yaxes(title_text="Field Value", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)") fig.update_yaxes(range=[-max_abs - pad, max_abs + pad]) # Cache the figure for export qpu_ts_cache["fig"] = fig qpu_ts_cache["times"] = list(times) qpu_ts_cache["series_map"] = {(field_type, monitor_x, monitor_y): list(field_values)} qpu_ts_cache["field"] = field_type qpu_ts_cache["unique_fields"] = [field_type] try: ctrl.qpu_ts_update(fig) except Exception: pass state.simulation_has_run = True state.run_button_text = "Successful!" state.simulation_progress = 100 state.status_message = "IonQ QPU simulation completed successfully!" log_to_console("IonQ QPU run completed") state.status_type = "success" state.show_progress = False _auto_hide_status_window(3.0) await _flush_async() ready = bool(field_values) and len(field_values) > 0 state.qpu_ts_ready = ready state.qpu_plot_style = ( "width: 900px; height: 660px; margin: 0 auto;" if ready else "display: none; width: 900px; height: 660px; margin: 0 auto;" ) state.qpu_ts_other_ready = False state.qpu_other_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;" # Set filter options for single result state.qpu_plot_field_options = ["All", field_type] state.qpu_plot_filter = "All" state.qpu_plot_position_options = ["All positions", label] state.qpu_plot_position_filter = "All positions" if not ready: state.error_message = "No IonQ QPU time series generated. Check Δt, T, nx, and monitor position." state.status_message = "Warning: No IonQ QPU time series generated." state.status_type = "warning" log_to_console("IonQ QPU complete.") except Exception as e: import traceback state.error_message = f"IonQ QPU run failed: {e}" state.status_message = f"IonQ QPU Error: {e}" state.status_type = "error" state.show_progress = False state.run_button_text = "RUN!" state.qpu_ts_ready = False log_to_console(f"IonQ QPU error: {e}") log_to_console(traceback.format_exc()) finally: state.is_running = False state.stop_button_disabled = True _stop_progress_heartbeat() executor.shutdown(wait=False) await _flush_async() return # Simulator path - run blocking simulation in executor log_to_console("Running simulation...") state.status_message = "Running simulation... This may take a while." state.simulation_progress = 30 await _flush_async() snapshot_dt = float(state.dt_user) def _stop_check(): return g.stop_simulation state.simulation_progress = 50 await _flush_async() # Run the blocking simulation in a thread pool to keep UI responsive def _run_blocking_sim(): return run_sim( nx, na, R, initial_state, T, snapshot_dt=snapshot_dt, stop_check=_stop_check, progress_callback=_progress_callback, print_callback=log_to_console ) try: sim_data, times = await loop.run_in_executor(executor, _run_blocking_sim) except Exception as e: state.error_message = f"Simulation error: {e}" state.status_message = f"Error: {e}" state.status_type = "error" state.show_progress = False state.is_running = False state.run_button_text = "RUN!" state.stop_button_disabled = True _stop_progress_heartbeat() await _flush_async() executor.shutdown(wait=False) return g.simulation_data = sim_data g.snapshot_times = times log_to_console("Simulation complete.") state.simulation_progress = 80 state.status_message = "Processing simulation results..." await _flush_async() if sim_data.size > 0: setup_surface_plot_data(sim_data, nx) state.simulation_has_run = True state.run_button_text = "Successful!" state.simulation_progress = 100 state.status_message = "Simulation completed successfully!" state.status_type = "success" state.show_progress = False _auto_hide_status_window(3.0) # Auto-hide after 3 seconds generate_plot() else: state.error_message = "Simulation produced no data. Check parameters (e.g., T > 0)." state.status_message = "Error: Simulation produced no data." state.status_type = "error" state.show_progress = False state.run_button_text = "RUN!" state.is_running = False state.stop_button_disabled = True _stop_progress_heartbeat() await _flush_async() # Cleanup executor executor.shutdown(wait=False) def reset_to_defaults(): """Reset all parameters to their default values.""" from .excitation import update_initial_state_preview, update_sim_monitor_points from . import globals as g # Stop any running simulation set_stop_simulation(True) # Reset global variables reset_globals() # Reset state to default values state.update({ "dist_type": None, "impulse_x": 0.5, "impulse_y": 0.5, "peak_pair": "(0.5, 0.5)", "mu_x": 0.5, "mu_y": 0.5, "sigma_x": 0.25, "sigma_y": 0.15, "mu_pair": "(0.5, 0.5)", "sigma_pair": "(0.25, 0.15)", "nx": None, "T": 10.0, "time_val": 0.0, "output_type": "Surface Plot", "surface_field": "Ez", "timeseries_field": "Ez", "timeseries_points": "(0.5, 0.5)", "timeseries_gridpoints": "", "timeseries_point_info": "", "error_message": "", "excitation_info_message": "", "excitation_config_open": False, "is_running": False, "simulation_has_run": False, "geometry_selection": None, "coeff_permittivity": 1.0, "coeff_permeability": 1.0, "run_button_text": "RUN!", "backend_type": None, "selected_simulator": "IBM Qiskit simulator", "selected_qpu": "IBM QPU", "stop_button_disabled": True, "export_format": "vtk", "nx_slider_index": None, "dt_user": 0.1, "temporal_warning": "", "qpu_field_components": "Ez", "qpu_monitor_gridpoints": "", "qpu_monitor_samples": "(0.5, 0.5)", "qpu_monitor_sample_info": "", "qpu_monitor_count": 0, "qpu_plot_filter": "All", "qpu_plot_field_options": ["All"], "qpu_plot_position_filter": "All positions", "qpu_plot_position_options": ["All positions"], "qpu_ts_ready": False, "qpu_plot_style": "display: none; width: 900px; height: 660px; margin: 0 auto;", "qpu_ts_other_ready": False, "qpu_other_plot_style": "display: none; width: 900px; height: 660px; margin: 0 auto;", "pyvista_view_style": "aspect-ratio: 1 / 1; width: 100%;", }) # Reset QPU cache qpu_ts_cache.update({ "times": None, "series_map": None, "field": None, "fig": None, "positions_by_field": {"All": []}, "key_to_label": {}, "label_to_keys": {}, "nx": None, }) # Ensure stop flag is cleared for next run set_stop_simulation(False) # Update monitors update_sim_monitor_points() _apply_workflow_highlights(0) # Update the preview with default values update_initial_state_preview() print("Reset to default settings") def stop_simulation_handler(): """Stop the currently running simulation.""" set_stop_simulation(True) state.status_message = "Stopping simulation..." state.status_type = "warning" log_to_console("Stopping simulation...") # --------------------------------------------------------------------------- # Grid overlay helpers for PyVista plots # --------------------------------------------------------------------------- def add_dotted_unit_grid(pl, ticks=(0.0, 0.25, 0.5, 0.75, 1.0), segments=48, gap_ratio=0.4, color="#AE8BD8", line_width=0.2): """Add a dotted unit grid (0..1) overlay in light Synopsys purple.""" import pyvista as pv try: step = 1.0 / float(max(segments, 1)) seg_len = step * float(max(0.0, min(1.0, 1.0 - gap_ratio))) pts = [] lines = [] # Horizontal dotted lines at given y=tick for y in ticks: pos = 0.0 while pos < 1.0 - 1e-9: y0, y1 = pos, min(pos + seg_len, 1.0) pts.extend([(0.0, y, 0.0), (1.0, y, 0.0)]) pts[-2] = (pos, y, 0.0) pts[-1] = (y1 if seg_len > 0 else pos, y, 0.0) i0 = len(pts) - 2 lines.extend([2, i0, i0 + 1]) pos += step # Vertical dotted lines at given x=tick for x in ticks: pos = 0.0 while pos < 1.0 - 1e-9: y0, y1 = pos, min(pos + seg_len, 1.0) pts.extend([(x, pos, 0.0), (x, y1 if seg_len > 0 else pos, 0.0)]) i0 = len(pts) - 2 lines.extend([2, i0, i0 + 1]) pos += step if pts and lines: poly = pv.PolyData(np.array(pts)) poly.lines = np.array(lines) pl.add_mesh(poly, color=color, line_width=line_width, name="dotted_unit_grid", pickable=False) except Exception: pass def add_dotted_unit_grid_scaled(pl, denom, ticks=(0.0, 0.25, 0.5, 0.75, 1.0), segments=48, gap_ratio=0.6, color="#AE8BD8", line_width=1.0, name="dotted_unit_grid_preview"): """Overlay a 0–1 dotted grid scaled to [0, denom] on the XY plane.""" import pyvista as pv from . import globals as g try: step = 1.0 / float(max(segments, 1)) seg_len = step * float(max(0.0, min(1.0, 1.0 - gap_ratio))) # Set a z slightly below mesh to avoid z-fighting try: z0 = float(g.current_mesh.points[:, 2].min()) - 1e-6 if g.current_mesh is not None else 0.0 except Exception: z0 = 0.0 pts, lines = [], [] # Vertical lines at x = t * denom for t in ticks: x = float(t) * float(denom) pos = 0.0 while pos < 1.0 - 1e-9: y0 = pos * denom y1 = min(pos + seg_len, 1.0) * denom pts.extend([(x, y0, z0), (x, y1, z0)]) i0 = len(pts) - 2 lines.extend([2, i0, i0 + 1]) pos += step # Horizontal lines at y = t * denom for t in ticks: y = float(t) * float(denom) pos = 0.0 while pos < 1.0 - 1e-9: x0 = pos * denom x1 = min(pos + seg_len, 1.0) * denom pts.extend([(x0, y, z0), (x1, y, z0)]) i0 = len(pts) - 2 lines.extend([2, i0, i0 + 1]) pos += step try: pl.remove_actor(name) except Exception: pass if pts and lines: poly = pv.PolyData(np.array(pts)) poly.lines = np.array(lines) pl.add_mesh(poly, color=color, line_width=line_width, name=name, pickable=False) except Exception: pass # --------------------------------------------------------------------------- # Simulator timeseries plot builder # --------------------------------------------------------------------------- def build_sim_timeseries_plotly(field_type: str, positions, nx: int, times, sim_data): """Build a Plotly figure for simulator timeseries data.""" import plotly.graph_objects as go from matplotlib import cm as _cm from .utils import normalized_position_label try: def _rgba_to_hex(rgba): r, g, b, a = rgba return "#%02x%02x%02x" % (int(r*255), int(g*255), int(b*255)) n_frames = int(sim_data.shape[0]) if sim_data is not None else 0 time_axis = np.asarray(times) if times is not None else np.arange(n_frames) def _dims(f): if f == 'Ez': return nx, nx if f == 'Hx': return nx, nx - 1 return nx - 1, nx # Hy def _valid_positions(f, pts): gw, gh = _dims(f) out = [] for (px, py) in pts: if 0 <= px < gw and 0 <= py < gh: out.append((int(px), int(py))) return out fig = go.Figure() if not positions or sim_data is None or n_frames == 0: fig.update_layout( title="Time Series (Simulator)", height=660, width=900, margin=dict(l=50, r=30, t=50, b=50), xaxis=dict(title="Time (s)", title_font=dict(size=22), tickfont=dict(size=16), showline=True, linewidth=1, linecolor="rgba(0,0,0,.3)", gridcolor="rgba(0,0,0,.06)", showspikes=True, spikemode='across', spikesnap='cursor'), yaxis=dict(title="Field Amplitude", title_font=dict(size=22), tickfont=dict(size=16), showline=True, linewidth=1, linecolor="rgba(0,0,0,.3)", gridcolor="rgba(0,0,0,.06)", zeroline=True, zerolinecolor="rgba(0,0,0,.25)"), hovermode="x unified", legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1), ) return fig max_sum = max((px + py) for (px, py) in positions) if positions else 1 if max_sum <= 0: max_sum = 1 cmap_map = { 'Ez': _cm.Reds, 'Hx': _cm.Greens, 'Hy': _cm.Blues, } def _add_field_traces(f_name: str, pts): nonlocal fig gw, gh = _dims(f_name) valid_pts = _valid_positions(f_name, pts) if not valid_pts: return 0.0, 0 max_abs_local = 0.0 num_keys = len(valid_pts) for i, (px, py) in enumerate(valid_pts): if f_name == 'Ez': values = sim_data[:, py * gw + px] elif f_name == 'Hx': block = sim_data[:, 2*nx*nx : 3*nx*nx-nx].reshape(n_frames, gh, gw) values = block[:, py, px] else: # Hy mask = np.arange(1, nx * nx + 1) % nx != 0 raw_block = sim_data[:, -nx*nx:] values = np.array([raw_block[t, mask].reshape(nx, nx - 1)[py, px] for t in range(n_frames)]) try: max_abs_local = max(max_abs_local, float(np.max(np.abs(values)))) except Exception: pass if num_keys > 1: s_index = i / (num_keys - 1) s_light = 0.3 + 0.6 * s_index else: s_light = 0.6 rgba = cmap_map.get(f_name, _cm.Blues)(s_light) color_hex = _rgba_to_hex(rgba) dash_styles = ["solid", "dash", "dot", "dashdot"] marker_symbols = ["circle", "square", "diamond", "triangle-up", "x"] label = normalized_position_label(px, py, gw, gh) fig.add_trace(go.Scatter( x=time_axis, y=values, mode='lines+markers', name=label, line=dict(color=color_hex, width=2.5, dash=dash_styles[i % len(dash_styles)]), marker=dict(size=7, symbol=marker_symbols[i % len(marker_symbols)], color=color_hex, line=dict(width=0)), hovertemplate=f"{f_name} | t=%{{x:.3f}}s
Value=%{{y:.6g}}{label}", )) return max_abs_local, len(valid_pts) max_abs = 0.0 total_traces = 0 if str(field_type) == 'All': for f in ('Ez', 'Hx', 'Hy'): m, n_tr = _add_field_traces(f, positions) max_abs = max(max_abs, m) total_traces += n_tr else: m, n_tr = _add_field_traces(str(field_type), positions) max_abs = max(max_abs, m) total_traces += n_tr title_suffix = str(field_type) if str(field_type) != 'All' else 'Ez, Hx, Hy' fig.update_layout( title=f"Time Series (Simulator: {title_suffix})", height=660, width=900, margin=dict(l=50, r=30, t=50, b=50), hovermode="x unified", legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1, title_text=""), paper_bgcolor="#FFFFFF", plot_bgcolor="#FFFFFF", ) fig.update_xaxes( title_text="Time (s)", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(95,37,159,0.08)", zeroline=False, showline=True, linewidth=1, linecolor="rgba(0,0,0,.2)", showspikes=True, spikemode='across', spikesnap='cursor' ) fig.update_yaxes( title_text="Field Amplitude", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(95,37,159,0.08)", zeroline=True, zerolinecolor="rgba(0,0,0,.25)", showline=True, linewidth=1, linecolor="rgba(0,0,0,.2)" ) if max_abs > 0: pad = 0.12 * max_abs fig.update_yaxes(range=[-max_abs - pad, max_abs + pad]) return fig except Exception: import plotly.graph_objects as go return go.Figure(layout=dict(height=660, width=900)) # --------------------------------------------------------------------------- # Value display for picked points on the mesh # --------------------------------------------------------------------------- def update_value_display(point): """Update value display when a point is picked on the mesh.""" from . import globals as g if g.current_mesh is None: return try: plotter.remove_actor("value_text") except Exception: pass closest_id = g.current_mesh.find_closest_point(point) if closest_id == -1: return value = g.current_mesh['scalars'][closest_id] if 'scalars' in g.current_mesh.array_names else 0.0 px, py, pz = g.current_mesh.points[closest_id] px = float(px) py = float(py) xmin, xmax, ymin, ymax, _, _ = g.current_mesh.bounds is_unit_square = (xmax <= 1.00001 and ymax <= 1.00001) if not state.simulation_has_run and is_unit_square: text = f"Position: ({px:.3f}, {py:.3f})\nValue: {value:.3e}" else: nx_val = int(state.nx) denom = max(float(nx_val - 1), 1.0) if is_unit_square: ix = int(round(px * denom)) iy = int(round(py * denom)) x_code = max(0.0, min(1.0, px)) y_code = max(0.0, min(1.0, py)) else: ix = int(round(px)) iy = int(round(py)) x_code = max(0.0, min(1.0, px / denom)) y_code = max(0.0, min(1.0, py / denom)) ix = max(0, min(ix, nx_val - 1)) iy = max(0, min(iy, nx_val - 1)) if state.simulation_has_run: time = float(state.time_val) text = f"Index: ({ix}, {iy}) | Position: ({x_code:.3f}, {y_code:.3f})\nTime: {time:.2f}s\nValue: {value:.3e}" else: text = f"Index: ({ix}, {iy}) | Position: ({x_code:.3f}, {y_code:.3f})\nValue: {value:.3e}" plotter.add_text(text, name="value_text", position="lower_left", color="black", font_size=10) ctrl.view_update() # --------------------------------------------------------------------------- # EM Job Result Upload Processing # --------------------------------------------------------------------------- def process_uploaded_em_job_result(): """ Process an IBM/IonQ EM job by retrieving it directly using the Job ID and generate a time-series plot. This function: 1. Takes the Job ID from user input 2. Connects to IBM/IonQ based on platform selection and retrieves the job 3. Extracts expectation values (evs) from Estimator results and converts them to field magnitudes 3. Builds time frames based on user-specified T and dt 4. Generates a Plotly time-series figure Note: - This pathway expects the job was submitted by this EM workflow (Estimator-based). - The job is assumed to contain one expectation value per time frame. """ import os import plotly.graph_objects as go if not state.bound: return # Validate Job ID job_id = None if getattr(state, "em_job_id", None) and str(state.em_job_id).strip(): job_id = str(state.em_job_id).strip() if job_id.endswith(".json"): job_id = job_id[:-5] if not job_id: state.em_job_upload_error = "No Job ID provided. Please enter a Job ID." return # Reset messages state.em_job_upload_error = "" state.em_job_upload_success = "" state.em_job_is_processing = True try: from .simulation import log_to_console except ImportError: def log_to_console(msg): print(msg) log_to_console(f"Processing EM job result for Job ID: {job_id}") try: # Parse parameters from UI field_type = str(state.em_job_field_type or "Ez").strip() # Parse monitor point tuple string "(x, y)" monitor_point_str = str(state.em_job_monitor_point or "(0, 0)").strip() try: # Remove parentheses and split by comma cleaned = monitor_point_str.strip("() ") parts = [p.strip() for p in cleaned.split(",")] monitor_x = int(parts[0]) if len(parts) > 0 else 0 monitor_y = int(parts[1]) if len(parts) > 1 else 0 except (ValueError, IndexError): monitor_x, monitor_y = 0, 0 total_time = float(state.em_job_total_time or 1.0) snapshot_dt = float(state.em_job_snapshot_dt or 0.1) nx = int(state.em_job_nx or 4) platform = str(state.em_job_platform or "IBM") log_to_console(f"Parameters: field={field_type}, pos=({monitor_x},{monitor_y}), T={total_time}, dt={snapshot_dt}, nx={nx}, platform={platform}") # Retrieve job results from provider field_values = [] times = [] if platform.upper() == "IBM": try: from qiskit_ibm_runtime import QiskitRuntimeService except Exception: state.em_job_upload_error = "qiskit_ibm_runtime package not available. Please install it." state.em_job_is_processing = False return try: ibm_token = os.environ.get("API_KEY_IBM_EM") if not ibm_token or not str(ibm_token).strip(): state.em_job_upload_error = "IBM API token not found. Set API_KEY_IBM_EM environment variable." state.em_job_is_processing = False return service = QiskitRuntimeService( channel="ibm_cloud", token=ibm_token, instance="crn:v1:bluemix:public:quantum-computing:us-east:a/15157e4350c04a9dab51b8b8a4a93c86:e29afd91-64bf-4a82-8dbf-731e6c213595::", ) except Exception as e: state.em_job_upload_error = f"Failed to connect to IBM Quantum: {e}" state.em_job_is_processing = False return try: job = service.job(job_id) except Exception as e: state.em_job_upload_error = f"Failed to retrieve IBM job: {e}" state.em_job_is_processing = False return try: status = job.status() status_name = status.name if hasattr(status, "name") else str(status) if status_name not in ("DONE", "COMPLETED"): state.em_job_upload_error = f"Job is not complete. Current status: {status_name}" state.em_job_is_processing = False return except Exception: pass try: # Support both shapes: # - PrimitiveResult: iterable of pubs -> pub.data.evs # - list-like result where each entry has .data.evs res = job.result() if hasattr(res, "__iter__"): for pub in res: data = getattr(pub, "data", None) evs = getattr(data, "evs", None) if data is not None else None if evs is not None: z_exp = float(np.array(evs).reshape(-1)[0]) field_values.append(float(np.sqrt((1 - z_exp) / 2))) elif hasattr(res, "data") and hasattr(res.data, "evs"): z_exp = float(np.array(res.data.evs).reshape(-1)[0]) field_values.append(float(np.sqrt((1 - z_exp) / 2))) except Exception as e: state.em_job_upload_error = f"Failed to get job results: {e}" state.em_job_is_processing = False return else: # IonQ pathway (Estimator-based in this app) try: from qiskit_ionq import IonQProvider except Exception: state.em_job_upload_error = "qiskit_ionq package not available. Please install it." state.em_job_is_processing = False return ionq_token = os.environ.get("API_KEY_IONQ_EM") if not ionq_token or not str(ionq_token).strip(): state.em_job_upload_error = "IonQ API token not found. Set API_KEY_IONQ_EM environment variable." state.em_job_is_processing = False return os.environ.setdefault("IONQ_API_TOKEN", ionq_token) try: provider = IonQProvider() job = provider.retrieve_job(job_id) except Exception as e: state.em_job_upload_error = f"Failed to retrieve IonQ job: {e}" state.em_job_is_processing = False return try: status = job.status() status_name = status.name if hasattr(status, "name") else str(status) if status_name not in ("DONE", "COMPLETED"): state.em_job_upload_error = f"Job is not complete. Current status: {status_name}" state.em_job_is_processing = False return except Exception: pass try: res = job.result() if hasattr(res, "__iter__"): for pub in res: data = getattr(pub, "data", None) evs = getattr(data, "evs", None) if data is not None else None if evs is not None: z_exp = float(np.array(evs).reshape(-1)[0]) field_values.append(float(np.sqrt((1 - z_exp) / 2))) elif hasattr(res, "data") and hasattr(res.data, "evs"): z_exp = float(np.array(res.data.evs).reshape(-1)[0]) field_values.append(float(np.sqrt((1 - z_exp) / 2))) except Exception as e: state.em_job_upload_error = f"Failed to get job results: {e}" state.em_job_is_processing = False return if not field_values: state.em_job_upload_error = "No field values extracted from job. Ensure the job was submitted by the EM Estimator workflow." state.em_job_is_processing = False return # Generate times if not provided if not times: # Use create_time_frames from delta_impulse_generator try: times = create_time_frames(total_time, snapshot_dt) except: # Fallback: generate linearly num_steps = len(field_values) times = [i * snapshot_dt for i in range(num_steps)] # Ensure times matches field_values length if len(times) != len(field_values): log_to_console(f"Warning: times ({len(times)}) != field_values ({len(field_values)}), regenerating times") num_steps = len(field_values) times = [i * snapshot_dt for i in range(num_steps)] log_to_console(f"Building time-series plot: {len(field_values)} points") # Build Plotly figure fig = go.Figure() # Determine grid dimensions for label if field_type == 'Ez': gw, gh = nx, nx elif field_type == 'Hx': gw, gh = nx, nx - 1 else: gw, gh = nx - 1, nx from .utils import normalized_position_label label = normalized_position_label(monitor_x, monitor_y, gw, gh) # Color based on field type if field_type == 'Ez': color = "#d32f2f" # Red elif field_type == 'Hx': color = "#388e3c" # Green else: color = "#1976d2" # Blue fig.add_trace( go.Scatter( x=list(times), y=[float(v) for v in field_values], mode='lines+markers', name=f"{field_type} @ {label}", line=dict(color=color, width=2.5), marker=dict(size=7, symbol="circle", color=color), hovertemplate=f"{field_type} | t=%{{x:.3f}}s
Value=%{{y:.6g}}{label}", ) ) max_abs = max((abs(float(v)) for v in field_values), default=1.0) pad = 0.12 * max_abs if max_abs > 0 else 0.1 fig.update_layout( title=f"{platform} QPU Time Series (Uploaded) - {field_type} @ {label}", height=660, width=900, margin=dict(l=50, r=30, t=50, b=50), hovermode="x unified", legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1, title_text=""), paper_bgcolor="#FFFFFF", plot_bgcolor="#FFFFFF", ) fig.update_xaxes(title_text="Time (s)", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)") fig.update_yaxes(title_text="Field Value", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)") fig.update_yaxes(range=[-max_abs - pad, max_abs + pad]) # Cache the figure for export qpu_ts_cache["fig"] = fig qpu_ts_cache["times"] = list(times) qpu_ts_cache["series_map"] = {(field_type, monitor_x, monitor_y): list(field_values)} qpu_ts_cache["field"] = field_type qpu_ts_cache["unique_fields"] = [field_type] # Update the Plotly figure widget try: ctrl.qpu_ts_update(fig) except Exception: pass # Update state state.simulation_has_run = True state.qpu_ts_ready = True state.qpu_plot_style = "width: 900px; height: 660px; margin: 0 auto;" state.qpu_plot_field_options = ["All", field_type] state.qpu_plot_filter = "All" state.qpu_plot_position_options = ["All positions", label] state.qpu_plot_position_filter = "All positions" state.em_job_upload_success = f"✓ Successfully processed {len(field_values)} time step(s) from {platform} job {job_id}" log_to_console(f"Upload processing complete: {len(field_values)} points plotted") except Exception as e: state.em_job_upload_error = f"Error processing job result: {e}" log_to_console(f"Processing error: {e}") import traceback log_to_console(traceback.format_exc()) finally: state.em_job_is_processing = False