""" QLBM Embedded Mode Module This module provides functions to build the QLBM UI into an existing Trame server, enabling single-server architecture for the unified app. Contains ALL features from qlbm.py but designed for embedded use. """ import os import numpy as np import math import pyvista as pv import plotly.graph_objects as go import tempfile import base64 import json import asyncio import threading import time as time_module from pathlib import Path from datetime import datetime from trame_vuetify.widgets import vuetify3 from trame.widgets import html from trame_plotly.widgets import plotly as plotly_widgets from pyvista.trame.ui import plotter_ui # Set offscreen before pyvista usage pv.OFF_SCREEN = True # --- Qiskit Backend Detection --- _QISKIT_BACKEND_AVAILABLE = False _QISKIT_IMPORT_ERROR = None _VISUALIZE_COUNTS_AVAILABLE = False try: from qlbm.qlbm_sample_app import ( run_sampling_sim, run_sampling_hw_ibm, run_sampling_hw_ionq, get_named_init_state_circuit, str_to_lambda, _create_slider_figure, show_initial_distribution, ) _QISKIT_BACKEND_AVAILABLE = True except ImportError as e: _QISKIT_IMPORT_ERROR = str(e) print(f"Qiskit backend not available: {e}") # Import visualize_counts for job result processing try: from qlbm.visualize_counts import ( load_samples, estimate_density, plot_density_isosurface_slider, ) _VISUALIZE_COUNTS_AVAILABLE = True except ImportError as e: print(f"visualize_counts not available: {e}") load_samples = None estimate_density = None plot_density_isosurface_slider = None # --- CUDA-Q Backend Detection --- def _env_flag(name: str) -> bool: return os.environ.get(name, "").strip().lower() in ("1", "true", "yes") def _should_disable_quantum_backend() -> str | None: """Return a reason string if quantum backend should be disabled, else None.""" if _env_flag("FORCE_CPU_DEMO"): return "FORCE_CPU_DEMO environment variable is set" if _env_flag("HUGGINGFACE_SPACE") or os.environ.get("SPACE_ID"): return "Hugging Face Spaces detected (no GPU runtime)" return None _disable_reason = _should_disable_quantum_backend() simulate_qlbm_3D_and_animate = None if _disable_reason: _SIMULATION_BACKEND_READY = False _SIMULATION_BACKEND_NOTE = f"CPU demo mode active ({_disable_reason}). Results are approximate." _SIMULATION_MODE_LABEL = "CPU demo backend" _SIMULATION_DISABLED_REASON = _disable_reason else: try: from qlbm.fluid3d_pyvista import simulate_qlbm_3D_and_animate _SIMULATION_BACKEND_READY = True _SIMULATION_BACKEND_NOTE = "" _SIMULATION_MODE_LABEL = "Quantum CUDA-Q backend" _SIMULATION_DISABLED_REASON = None except Exception as exc: simulate_qlbm_3D_and_animate = None _SIMULATION_BACKEND_READY = False _SIMULATION_BACKEND_NOTE = f"CPU demo mode active (import error: {exc}). Results are approximate." _SIMULATION_MODE_LABEL = "CPU demo backend" _SIMULATION_DISABLED_REASON = str(exc) _SIMULATION_CAN_RUN = True # CPU demo is always available _CPU_DEMO_MAX_GRID = 48 # Module-level state _server = None _state = None _ctrl = None _plotter = None _initialized = False # Global simulation data simulation_data_frames = [] simulation_times = [] current_grid_object = None # --- Async infrastructure for real-time progress updates --- _qlbm_main_loop = None # Reference to main event loop for thread-safe callbacks _qlbm_heartbeat_thread = None _qlbm_heartbeat_on = False _qlbm_sim_start_time = None def _qlbm_flush_state(): """Force state flush to browser (synchronous, for main thread use).""" try: if _server: _server.state.flush() except Exception: pass def _qlbm_flush_state_threadsafe(): """ Thread-safe state flush - schedules flush on the main event loop. Use this from background threads (e.g., inside executor callbacks). """ global _qlbm_main_loop try: if _server and _qlbm_main_loop is not None and _qlbm_main_loop.is_running(): # Schedule the flush on the main event loop _qlbm_main_loop.call_soon_threadsafe(_server.state.flush) elif _server: # Fallback: direct flush (may not work from threads) _server.state.flush() except Exception: pass async def _qlbm_flush_async(): """Async helper to flush state and yield to event loop.""" _qlbm_flush_state() await asyncio.sleep(0) # Yield control to event loop def _qlbm_start_progress_heartbeat(): """Start background thread for continuous progress updates.""" global _qlbm_heartbeat_thread, _qlbm_heartbeat_on, _qlbm_sim_start_time if _qlbm_heartbeat_thread and _qlbm_heartbeat_thread.is_alive(): return _qlbm_sim_start_time = time_module.time() def loop_fn(): global _qlbm_heartbeat_on while _qlbm_heartbeat_on: if _state is not None and _state.qlbm_is_running and _qlbm_sim_start_time is not None: elapsed = time_module.time() - _qlbm_sim_start_time # Optionally update elapsed time state here if needed _qlbm_flush_state_threadsafe() time_module.sleep(0.1) # Update every 100ms _qlbm_heartbeat_on = True _qlbm_heartbeat_thread = threading.Thread(target=loop_fn, daemon=True) _qlbm_heartbeat_thread.start() def _qlbm_stop_progress_heartbeat(): """Stop the background heartbeat thread.""" global _qlbm_heartbeat_on, _qlbm_heartbeat_thread _qlbm_heartbeat_on = False _qlbm_heartbeat_thread = None def _qlbm_auto_hide_status_window(delay_seconds=3.0): """ Schedule the QLBM status window to auto-hide after a delay. Shows the completion message briefly then closes automatically. """ def _hide_after_delay(): time_module.sleep(delay_seconds) if _state is not None: _state.qlbm_status_visible = False _qlbm_flush_state_threadsafe() hide_thread = threading.Thread(target=_hide_after_delay, daemon=True) hide_thread.start() GRID_SIZES = [8, 16, 32, 64, 128, 256] _WORKFLOW_BASE_STYLE = "font-size: 0.8rem; border: 1px solid transparent; transition: box-shadow 0.2s ease;" _WORKFLOW_HIGHLIGHT_STYLE = "font-size: 0.8rem; box-shadow: 0 0 0 2px #6200ea;" _WORKFLOW_CARD_KEYS = ["overview_card_style", "distribution_card_style", "advect_card_style", "meshing_card_style", "backend_card_style"] _PROBLEM_GEOMETRY_MAP = { "Scalar advection-diffusion in a box": "Cube", "Laminar flow & heat transfer for a heated body in water.": "Rectangular domain with a heated box (3D)", } _QLBM_PREVIEW_COLORSCALE = "Turbo" def set_server(server): """Set the server for embedded mode.""" global _server, _state, _ctrl _server = server _state = server.state _ctrl = server.controller def init_state(): """Initialize QLBM state variables with all features from qlbm.py.""" global _initialized if _initialized or _state is None: return _state.update({ # Console & Status "qlbm_console_output": "QLBM Console initialized.\n", "qlbm_status_visible": True, "qlbm_status_message": "Ready", "qlbm_status_type": "info", "qlbm_simulation_progress": 0, "qlbm_show_progress": False, # Distribution "qlbm_dist_modes": ["Sinusoidal", "Gaussian", "Multi-Dirac-Delta"], "qlbm_dist_type": None, "qlbm_nx": 32, "qlbm_show_edges": False, "qlbm_custom_dist_params": False, # Sinusoidal params "qlbm_sine_k_x": 1.0, "qlbm_sine_k_y": 1.0, "qlbm_sine_k_z": 1.0, # Gaussian params "qlbm_gauss_cx": 16.0, "qlbm_gauss_cy": 16.0, "qlbm_gauss_cz": 16.0, "qlbm_gauss_sigma": 6.0, # Multi-Dirac-Delta params (log2 of frequency multipliers) "qlbm_mdd_kx_log2": 1, "qlbm_mdd_ky_log2": 1, "qlbm_mdd_kz_log2": 1, # Problem & Geometry "qlbm_qlbm_problems": [ "Scalar advection-diffusion in a box", "Laminar flow & heat transfer for a heated body in water.", ], "qlbm_problems_selection": None, "qlbm_geometry_selection": None, "qlbm_domain_L": 1.0, "qlbm_domain_W": 1.0, "qlbm_domain_H": 1.0, # Boundary conditions "qlbm_boundary_condition": "Periodic", # Advecting fields "qlbm_advecting_field": None, "qlbm_show_advect_params": False, "qlbm_vx_expr": "0.2", "qlbm_vy_expr": "-0.15", "qlbm_vz_expr": "0.3", # Meshing "qlbm_grid_index": 2, # Index into GRID_SIZES "qlbm_grid_size": 32, "qlbm_time_steps": 10, # Backend "qlbm_backend_type": None, "qlbm_selected_simulator": None, "qlbm_selected_qpu": None, # Simulation state "qlbm_is_running": False, "qlbm_run_error": "", "qlbm_simulation_has_run": False, "qlbm_time_val": 0, "qlbm_max_time_step": 0, "qlbm_time_slider_labels": [], "qlbm_current_time_label": "0.0", # Qubit info "qlbm_qubit_grid_info": "Grid Size: 32 × 32 × 32", "qlbm_qubit_warning": "", # Backend info "qlbm_simulation_backend_ready": _SIMULATION_CAN_RUN, "qlbm_simulation_backend_note": _SIMULATION_BACKEND_NOTE, "qlbm_simulation_backend_mode": _SIMULATION_MODE_LABEL, # Workflow highlighting "qlbm_workflow_step": 0, "qlbm_overview_card_style": _WORKFLOW_BASE_STYLE, "qlbm_distribution_card_style": _WORKFLOW_BASE_STYLE, "qlbm_advect_card_style": _WORKFLOW_BASE_STYLE, "qlbm_meshing_card_style": _WORKFLOW_BASE_STYLE, "qlbm_backend_card_style": _WORKFLOW_BASE_STYLE, # Pick point text "qlbm_pick_text": "", # Qiskit backend state "qlbm_qiskit_mode": False, # True when using Qiskit backend (shows Plotly slider) "qlbm_qiskit_backend_available": _QISKIT_BACKEND_AVAILABLE, "qlbm_qiskit_fig": None, # Stores the Plotly figure for Qiskit results # Job retrieval state (for loading previously saved QPU job results) "qlbm_job_upload_error": "", # Error message for retrieval "qlbm_job_upload_success": "", # Success message for retrieval "qlbm_job_platform": "IonQ", # Platform: IonQ or IBM "qlbm_job_id": "", # Job ID text field for direct entry "qlbm_job_total_time": 3, # Total time T (generates T_list = [1..T]) "qlbm_job_output_resolution": 40, # Grid resolution for density estimation "qlbm_job_is_processing": False, # True when processing job "qlbm_job_flag_qubits": True, # Whether flag qubits were used "qlbm_job_midcircuit_meas": False, # IonQ uses False, IBM uses True }) _initialized = True def log_to_console(message): """Log a message to the QLBM console.""" if _state is None: return timestamp = datetime.now().strftime("%H:%M:%S") new_line = f"[{timestamp}] {message}\n" _state.qlbm_console_output = (_state.qlbm_console_output or "") + new_line def _set_pick_text(text): """Set the pick text for point picking.""" if _state is not None: _state.qlbm_pick_text = text def _create_plotter(): """Create and return the PyVista plotter.""" global _plotter if _plotter is None: pv.OFF_SCREEN = True _plotter = pv.Plotter() return _plotter def _ensure_point_picking(callback): """Enable point picking on the plotter.""" global _plotter if _plotter is None: return try: _plotter.enable_point_picking( callback=callback, show_message=False, use_picker=True, pickable_window=False, show_point=True, point_size=12, color="red", ) except Exception: pass # --- Workflow Highlighting --- def _determine_workflow_step(): """Determine current workflow step based on state.""" if _state is None: return 0 if not _state.qlbm_problems_selection: return 0 if not _state.qlbm_dist_type: return 1 if not _state.qlbm_advecting_field: return 2 if not _state.qlbm_backend_type: return 4 return 5 def _apply_workflow_highlights(step): """Apply highlighting to the current workflow step card.""" if _state is None: return for i, key in enumerate(_WORKFLOW_CARD_KEYS): attr = f"qlbm_{key}" if hasattr(_state, attr): setattr(_state, attr, _WORKFLOW_HIGHLIGHT_STYLE if i == step else _WORKFLOW_BASE_STYLE) # --- Qubit Info --- def update_qubit_3D_info(grid_size: int): """Generate qubit requirement plot and info strings.""" try: num_reg_qubits = int(math.log2(grid_size)) if grid_size > 0 else 3 x = np.array([16, 32, 64, 128, 256]) y = np.log2(x).astype(int) fig = go.Figure() fig.add_trace(go.Scatter(x=x, y=y, mode='lines', name='Qubits/Direction', line=dict(color='#7A3DB5', width=3))) fig.add_trace(go.Scatter(x=[grid_size], y=[num_reg_qubits], mode='markers', marker=dict(size=12, color='red'), name='Current Selection')) fig.update_layout( xaxis_title="Grid Size (Points/Direction)", yaxis_title="Qubits/Direction", width=616, height=320, margin=dict(l=40, r=20, t=20, b=40) ) grid_display = f"Grid Size: {grid_size} × {grid_size} × {grid_size}" warning = "" if grid_size > 64: warning = "⚠️ Warning: Grid sizes > 64 may exceed simulator/memory limits!" elif grid_size > 16 and _state and _state.qlbm_selected_qpu == "IBM QPU" and _state.qlbm_backend_type == "QPU": warning = "⚠️ Warning: Grid size > 16 may exceed IBM QPU capacity!" return fig, grid_display, warning except Exception: return go.Figure(), "Grid Size: N/A", "" # --- Velocity Presets --- def set_velocity_preset(preset_name): """Map velocity preset buttons to expression triplets.""" if _state is None: return mapping = { "Uniform": ("0.6", "-0.5", "1"), "Swirl": ("sin(-2*pi*z)", "1", "sin(2*pi*x)"), "Shear": ("abs(z-0.5)*4-1", "0", "0"), "TGV": ("0.5*cos(2*pi*x)*sin(2*pi*y)*sin(2*pi*z)", "-*sin(2*pi*x)*cos(2*pi*y)*sin(2*pi*z)", "0.5*sin(2*pi*x)*sin(2*pi*y)*cos(2*pi*z)"), } vx, vy, vz = mapping.get(preset_name, mapping["Uniform"]) _state.qlbm_advecting_field = preset_name _state.qlbm_vx_expr = vx _state.qlbm_vy_expr = vy _state.qlbm_vz_expr = vz def make_velocity_func(expr): """Convert a string expression into a function of (x, y, z).""" def func(x, y, z): context = { "x": x, "y": y, "z": z, "sin": np.sin, "cos": np.cos, "tan": np.tan, "pi": np.pi, "abs": np.abs, "exp": np.exp, "sqrt": np.sqrt } try: return eval(str(expr), {"__builtins__": {}}, context) except Exception as e: print(f"Error evaluating velocity expression '{expr}': {e}") return np.zeros_like(x) if isinstance(x, np.ndarray) else 0.0 return func def _safe_velocity_sample(func) -> float: try: val = func(0.5, 0.5, 0.5) if isinstance(val, np.ndarray): val = float(np.mean(val)) return float(val) except Exception: return 0.0 def build_ui(): """Build the QLBM UI into the current Trame context.""" if _state is None: raise RuntimeError("Server not set. Call set_server() first.") init_state() plotter = _create_plotter() # Register state change handlers _register_handlers() # Apply initial CSS html.Style(""" :root{ --v-theme-primary:95,37,159; } .example-img{ max-width:100%; border-radius:4px; } .warn-text{ color:#b71c1c; font-size:0.85rem; } """) # Build the UI with vuetify3.VContainer(fluid=True, classes="pa-0 fill-height"): with vuetify3.VRow(no_gutters=True, classes="fill-height"): # Left Column: Controls with vuetify3.VCol(cols=5, classes="pa-2 d-flex flex-column", style="overflow-y: auto; max-height: 200vh;"): _build_control_panels(plotter) # Right Column: Visualization with vuetify3.VCol(cols=7, classes="pa-1 d-flex flex-column"): _build_visualization_panel(plotter) # Floating status window _build_status_window() # --- Distribution Figure Functions --- def get_initial_distribution_figure(distribution_type, N, show_edges=False): """Generate a 3D Plotly figure for the initial distribution.""" if _state is None: return go.Figure() if distribution_type == "Sinusoidal": kx = max(1.0, round(float(_state.qlbm_sine_k_x))) if hasattr(_state, "qlbm_sine_k_x") else 1.0 ky = max(1.0, round(float(_state.qlbm_sine_k_y))) if hasattr(_state, "qlbm_sine_k_y") else 1.0 kz = max(1.0, round(float(_state.qlbm_sine_k_z))) if hasattr(_state, "qlbm_sine_k_z") else 1.0 selected_func = lambda x, y, z: \ np.sin(x * 2 * np.pi * kx / N) * \ np.sin(y * 2 * np.pi * ky / N) * \ np.sin(z * 2 * np.pi * kz / N) + 1 title = f"Sinusoidal Distribution (N={N})" elif distribution_type == "Gaussian": cx = _state.qlbm_gauss_cx if hasattr(_state, "qlbm_gauss_cx") else N/2 cy = _state.qlbm_gauss_cy if hasattr(_state, "qlbm_gauss_cy") else N/2 cz = _state.qlbm_gauss_cz if hasattr(_state, "qlbm_gauss_cz") else N/2 sigma = _state.qlbm_gauss_sigma if hasattr(_state, "qlbm_gauss_sigma") and _state.qlbm_gauss_sigma > 0 else 0.1 selected_func = lambda x, y, z: \ np.exp(-((x - cx)**2 / (2 * sigma**2) + (y - cy)**2 / (2 * sigma**2) + (z - cz)**2 / (2 * sigma**2))) * 1.8 + 0.2 title = f"Gaussian Distribution (N={N})" elif distribution_type == "Multi-Dirac-Delta": # Get log2 frequency multipliers from state kx_log2 = int(_state.qlbm_mdd_kx_log2) if hasattr(_state, "qlbm_mdd_kx_log2") else 1 ky_log2 = int(_state.qlbm_mdd_ky_log2) if hasattr(_state, "qlbm_mdd_ky_log2") else 1 kz_log2 = int(_state.qlbm_mdd_kz_log2) if hasattr(_state, "qlbm_mdd_kz_log2") else 1 # Number of peaks per axis num_peaks_x = 2 ** kx_log2 num_peaks_y = 2 ** ky_log2 num_peaks_z = 2 ** kz_log2 # Create a function that produces peaks at regular intervals # Peaks are located at positions: N/(2*num_peaks) + i*N/num_peaks for i in 0..num_peaks-1 def multi_dirac_func(x, y, z): # Use narrow Gaussians to approximate delta functions delta_width = max(0.5, N / (8 * max(num_peaks_x, num_peaks_y, num_peaks_z))) result = np.zeros_like(x, dtype=float) for ix in range(num_peaks_x): peak_x = (0.5 + ix) * N / num_peaks_x for iy in range(num_peaks_y): peak_y = (0.5 + iy) * N / num_peaks_y for iz in range(num_peaks_z): peak_z = (0.5 + iz) * N / num_peaks_z result += np.exp(-( (x - peak_x)**2 + (y - peak_y)**2 + (z - peak_z)**2 ) / (2 * delta_width**2)) # Normalize to range [0.2, 2.0] for visibility if result.max() > 0: result = result / result.max() * 1.8 + 0.2 return result selected_func = multi_dirac_func title = f"Multi-Dirac-Delta (kx={num_peaks_x}, ky={num_peaks_y}, kz={num_peaks_z})" else: return go.Figure() # Create 3D grid x_indices = np.linspace(0, 1, N) y_indices = np.linspace(0, 1, N) z_indices = np.linspace(0, 1, N) X, Y, Z = np.meshgrid(x_indices, y_indices, z_indices, indexing='ij') # Calculate distribution values xi = np.arange(0, N) yi = np.arange(0, N) zi = np.arange(0, N) Xi, Yi, Zi = np.meshgrid(xi, yi, zi, indexing='ij') values = selected_func(Xi, Yi, Zi) # Create Plotly visualization isomin = np.min(values) isomax = np.max(values) surface_count = 5 if distribution_type == "Sinusoidal": isomin = 0.1 isomax = 1.9 surface_count = 4 data = [go.Isosurface( x=X.flatten(), y=Y.flatten(), z=Z.flatten(), value=values.flatten(), isomin=isomin, isomax=isomax, surface_count=surface_count, colorscale=_QLBM_PREVIEW_COLORSCALE, opacity=0.35, caps=dict(x_show=False, y_show=False, z_show=False) )] # Add translucent peach unit cube to give spatial frame cube_x = [0, 1, 1, 0, 0, 1, 1, 0] cube_y = [0, 0, 1, 1, 0, 0, 1, 1] cube_z = [0, 0, 0, 0, 1, 1, 1, 1] cube_color = "rgba(255,218,185,0.25)" # Peach with transparency data.append(go.Mesh3d( x=cube_x, y=cube_y, z=cube_z, i=[7, 0, 0, 0, 4, 4, 6, 6, 4, 0, 3, 2], j=[3, 4, 1, 2, 5, 6, 5, 2, 0, 1, 6, 3], k=[0, 7, 2, 3, 6, 7, 1, 1, 5, 5, 7, 6], opacity=0.18, color=cube_color, flatshading=True, showscale=False, name="Unit Cube" )) cube_edge_x = [ 0, 1, 1, 0, 0, None, 0, 1, 1, 0, 0, None, 0, 0, None, 1, 1, None, 1, 1, None, 0, 0 ] cube_edge_y = [ 0, 0, 1, 1, 0, None, 0, 0, 1, 1, 0, None, 0, 0, None, 0, 0, None, 1, 1, None, 1, 1 ] cube_edge_z = [ 0, 0, 0, 0, 0, None, 1, 1, 1, 1, 1, None, 0, 1, None, 0, 1, None, 0, 1, None, 0, 1 ] data.append(go.Scatter3d( x=cube_edge_x, y=cube_edge_y, z=cube_edge_z, mode='lines', line=dict(color='#E3A079', width=3), opacity=0.9, name='Unit Cube Frame' )) if show_edges: # Create grid lines Y_yz, Z_yz = np.meshgrid(y_indices, z_indices, indexing='ij') Y_flat, Z_flat = Y_yz.flatten(), Z_yz.flatten() num_lines = len(Y_flat) xe = np.full(num_lines * 3, np.nan) xe[0::3], xe[1::3] = 0, 1 ye = np.full(num_lines * 3, np.nan) ye[0::3] = ye[1::3] = Y_flat ze = np.full(num_lines * 3, np.nan) ze[0::3] = ze[1::3] = Z_flat X_xz, Z_xz = np.meshgrid(x_indices, z_indices, indexing='ij') X_flat, Z_flat = X_xz.flatten(), Z_xz.flatten() num_lines = len(X_flat) xe_y = np.full(num_lines * 3, np.nan) xe_y[0::3] = xe_y[1::3] = X_flat ye_y = np.full(num_lines * 3, np.nan) ye_y[0::3], ye_y[1::3] = 0, 1 ze_y = np.full(num_lines * 3, np.nan) ze_y[0::3] = ze_y[1::3] = Z_flat X_xy, Y_xy = np.meshgrid(x_indices, y_indices, indexing='ij') X_flat, Y_flat = X_xy.flatten(), Y_xy.flatten() num_lines = len(X_flat) xe_z = np.full(num_lines * 3, np.nan) xe_z[0::3] = xe_z[1::3] = X_flat ye_z = np.full(num_lines * 3, np.nan) ye_z[0::3] = ye_z[1::3] = Y_flat ze_z = np.full(num_lines * 3, np.nan) ze_z[0::3], ze_z[1::3] = 0, 1 x_all = np.concatenate([xe, xe_y, xe_z]) y_all = np.concatenate([ye, ye_y, ye_z]) z_all = np.concatenate([ze, ze_y, ze_z]) data.append(go.Scatter3d( x=x_all, y=y_all, z=z_all, mode='lines', line=dict(color='black', width=1), opacity=0.22, name='Grid Edges' )) fig = go.Figure(data=data) fig.update_layout( title=title, scene=dict( xaxis=dict(backgroundcolor="white", showbackground=True, gridcolor="lightgrey", zerolinecolor="lightgrey", title='X'), yaxis=dict(backgroundcolor="white", showbackground=True, gridcolor="lightgrey", zerolinecolor="lightgrey", title='Y'), zaxis=dict(backgroundcolor="white", showbackground=True, gridcolor="lightgrey", zerolinecolor="lightgrey", title='Z'), ), margin=dict(l=0, r=0, b=0, t=40), width=800, height=700 ) return fig def update_view(): """Update the preview visualization.""" global current_grid_object if _state is None: return # If simulation has run, don't update the preview if _state.qlbm_simulation_has_run: return try: N = int(_state.qlbm_nx) distribution_type = _state.qlbm_dist_type show_edges = _state.qlbm_show_edges fig = get_initial_distribution_figure(distribution_type, N, show_edges) if hasattr(_ctrl, "qlbm_preview_update"): _ctrl.qlbm_preview_update(fig) except Exception as e: print(f"Error updating view: {e}") def on_pick_point(point, *_) -> None: """Handle point picking on the 3D visualization.""" global current_grid_object if point is None or current_grid_object is None: return closest_id = current_grid_object.find_closest_point(point) if closest_id == -1: return values = current_grid_object.point_data.get('scalars') if values is None: return coords = current_grid_object.points[closest_id] val = float(values[closest_id]) x, y, z = coords _set_pick_text(f"Position: ({x:.3f}, {y:.3f}, {z:.3f})\nValue: {val:.4g}") if hasattr(_ctrl, "qlbm_view_update"): _ctrl.qlbm_view_update() # --- Geometry Figure --- def get_geometry_figure(): """Generates a 3D Plotly figure for the selected geometry.""" if _state is None: return go.Figure() geom = _state.qlbm_geometry_selection if geom == "Cube": fig = _create_box_figure(1, 1, 1, "Cube") elif geom == "Rectangular domain with a heated box (3D)": try: L = float(_state.qlbm_domain_L) W = float(_state.qlbm_domain_W) H = float(_state.qlbm_domain_H) except: L, W, H = 1.0, 1.0, 1.0 max_dim = max(L, W, H) if max_dim > 0: L /= max_dim W /= max_dim H /= max_dim fig = _create_box_figure(L, W, H, "Rectangular Domain") else: fig = go.Figure() fig.update_layout( scene=dict(xaxis=dict(visible=False), yaxis=dict(visible=False), zaxis=dict(visible=False)), margin=dict(l=0, r=0, b=0, t=0), ) return fig fig.update_layout( scene=dict( xaxis=dict(visible=False), yaxis=dict(visible=False), zaxis=dict(visible=False), aspectmode='data' ), margin=dict(l=0, r=0, b=0, t=30), ) return fig def _create_box_figure(lx, ly, lz, title): """Create a 3D box figure.""" x = [0, lx, lx, 0, 0, lx, lx, 0] y = [0, 0, ly, ly, 0, 0, ly, ly] z = [0, 0, 0, 0, lz, lz, lz, lz] fig = go.Figure() fig.add_trace(go.Mesh3d( x=x, y=y, z=z, i=[7, 0, 0, 0, 4, 4, 6, 6, 4, 0, 3, 2], j=[3, 4, 1, 2, 5, 6, 5, 2, 0, 1, 6, 3], k=[0, 7, 2, 3, 6, 7, 1, 1, 5, 5, 7, 6], opacity=0.2, intensity=list(range(len(x))), colorscale=_QLBM_PREVIEW_COLORSCALE, flatshading=True, name=title, showscale=False )) xe = [0, lx, lx, 0, 0, None, 0, lx, lx, 0, 0, None, 0, 0, None, lx, lx, None, lx, lx, None, 0, 0] ye = [0, 0, ly, ly, 0, None, 0, 0, ly, ly, 0, None, 0, 0, None, 0, 0, None, ly, ly, None, ly, ly] ze = [0, 0, 0, 0, 0, None, lz, lz, lz, lz, lz, None, 0, lz, None, 0, lz, None, 0, lz, None, 0, lz] fig.add_trace(go.Scatter3d( x=xe, y=ye, z=ze, mode='lines', line=dict(color='black', width=3), showlegend=False )) fig.update_layout(title=title) return fig def update_geometry_view(): """Update the geometry visualization.""" try: fig = get_geometry_figure() if hasattr(_ctrl, "qlbm_geometry_plot_update"): _ctrl.qlbm_geometry_plot_update(fig) except Exception as e: print(f"Error updating geometry view: {e}") # --- CPU Demo Simulation --- def _cpu_distribution_field(distribution_type: str, Xi, Yi, Zi, grid_size: int, drift, phase_fraction: float): """Generate the distribution field for CPU demo simulation.""" if _state is None: return np.ones_like(Xi) if distribution_type == "Sinusoidal": kx = max(1.0, round(float(_state.qlbm_sine_k_x))) if hasattr(_state, "qlbm_sine_k_x") else 1.0 ky = max(1.0, round(float(_state.qlbm_sine_k_y))) if hasattr(_state, "qlbm_sine_k_y") else 1.0 kz = max(1.0, round(float(_state.qlbm_sine_k_z))) if hasattr(_state, "qlbm_sine_k_z") else 1.0 x_term = np.sin((np.mod(Xi + drift[0], grid_size)) * 2 * np.pi * kx / grid_size) y_term = np.sin((np.mod(Yi + drift[1], grid_size)) * 2 * np.pi * ky / grid_size) z_term = np.sin((np.mod(Zi + drift[2], grid_size)) * 2 * np.pi * kz / grid_size) field = x_term * y_term * z_term + 1.0 else: # Gaussian nx_val = max(1.0, float(_state.qlbm_nx)) if hasattr(_state, "qlbm_nx") else float(grid_size) cx = float(_state.qlbm_gauss_cx) if hasattr(_state, "qlbm_gauss_cx") else nx_val / 2 cy = float(_state.qlbm_gauss_cy) if hasattr(_state, "qlbm_gauss_cy") else nx_val / 2 cz = float(_state.qlbm_gauss_cz) if hasattr(_state, "qlbm_gauss_cz") else nx_val / 2 sigma = float(_state.qlbm_gauss_sigma) if hasattr(_state, "qlbm_gauss_sigma") else nx_val / 6 scale = (grid_size - 1) / nx_val if nx_val else 1.0 cx = cx * scale + drift[0] cy = cy * scale + drift[1] cz = cz * scale + drift[2] sigma = max(1.0, sigma * scale) field = np.exp(-(((Xi - cx) ** 2 + (Yi - cy) ** 2 + (Zi - cz) ** 2) / (2 * sigma ** 2))) * 1.8 + 0.2 modulation = 0.15 * np.sin(2 * np.pi * phase_fraction + (Xi + Yi + Zi) * np.pi / max(1, grid_size)) return field + modulation def _run_cpu_demo_simulation(grid_size: int, T: int, distribution_type: str, vx_func, vy_func, vz_func, progress_callback=None): """Run CPU demo simulation.""" grid_size = int(max(8, min(grid_size, _CPU_DEMO_MAX_GRID))) idx_coords = np.linspace(0, grid_size - 1, grid_size, dtype=np.float32) Xi, Yi, Zi = np.meshgrid(idx_coords, idx_coords, idx_coords, indexing='ij') geom_coords = np.linspace(0, 1, grid_size, dtype=np.float32) Xg, Yg, Zg = np.meshgrid(geom_coords, geom_coords, geom_coords, indexing='ij') if T <= 0: target = 1.0 else: target = float(T) num_frames = min(30, max(2, int(min(target, 20)) + 1)) timeline = list(np.linspace(0.0, target, num_frames)) if len(timeline) < 2: timeline.append(target) vx = _safe_velocity_sample(vx_func) vy = _safe_velocity_sample(vy_func) vz = _safe_velocity_sample(vz_func) drift_scale = 0.25 * grid_size frames = [] for idx, t_val in enumerate(timeline): phase_fraction = idx / (len(timeline) - 1) if len(timeline) > 1 else 0.0 drift = ( vx * phase_fraction * drift_scale, vy * phase_fraction * drift_scale, vz * phase_fraction * drift_scale, ) field = _cpu_distribution_field(distribution_type, Xi, Yi, Zi, grid_size, drift, phase_fraction) frames.append(field.astype(np.float32)) if progress_callback: percent = int(((idx + 1) / len(timeline)) * 100) progress_callback(percent) grid = pv.StructuredGrid() grid.points = np.column_stack((Xg.ravel(), Yg.ravel(), Zg.ravel())) grid.dimensions = [grid_size, grid_size, grid_size] grid["scalars"] = frames[0].ravel() times = [float(t) for t in timeline] return frames, times, grid # --- Export Functions --- def export_simulation_vtk(): """Download the current simulation volume as a VTK file.""" global current_grid_object if not _state.qlbm_simulation_has_run or current_grid_object is None: log_to_console("VTK export unavailable: run a simulation first.") return temp_path = None try: suffix = datetime.now().strftime("%Y%m%d_%H%M%S") grid_size = int(_state.qlbm_grid_size or 0) filename = f"qlbm_volume_n{grid_size}_{suffix}.vts" tmp = tempfile.NamedTemporaryFile(suffix=".vts", delete=False) tmp.close() temp_path = Path(tmp.name) current_grid_object.save(str(temp_path)) _server.controller.download_file(temp_path.read_bytes(), filename) log_to_console(f"Exported VTK to {filename}") except Exception as exc: log_to_console(f"VTK export failed: {exc}") finally: if temp_path and temp_path.exists(): try: temp_path.unlink() except Exception: pass def export_simulation_mp4(): """Render the simulation frames to an MP4 animation for download.""" global simulation_data_frames, current_grid_object if not _state.qlbm_simulation_has_run or not simulation_data_frames: log_to_console("MP4 export unavailable: run a simulation first.") return if current_grid_object is None: log_to_console("MP4 export failed: missing grid data.") return temp_path = None movie_plotter = None try: suffix = datetime.now().strftime("%Y%m%d_%H%M%S") grid_size = int(_state.qlbm_grid_size or 0) filename = f"qlbm_animation_n{grid_size}_{suffix}.mp4" tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) tmp.close() temp_path = Path(tmp.name) movie_plotter = pv.Plotter(off_screen=True, window_size=(1280, 720)) try: camera_position = _plotter.camera_position if _plotter and _plotter.camera_position else None except Exception: camera_position = None base_grid = current_grid_object.copy() movie_plotter.open_movie(str(temp_path), framerate=15) for frame_data in simulation_data_frames: base_grid["scalars"] = np.asarray(frame_data).ravel() iso_mesh = base_grid.contour(isosurfaces=7, scalars="scalars") movie_plotter.clear() movie_plotter.add_mesh( iso_mesh, cmap="Blues", opacity=0.35, show_scalar_bar=False, ) movie_plotter.add_axes() if camera_position: try: movie_plotter.camera_position = camera_position except Exception: pass else: movie_plotter.view_isometric() movie_plotter.render() movie_plotter.write_frame() movie_plotter.close() movie_plotter = None _server.controller.download_file(temp_path.read_bytes(), filename) log_to_console(f"Exported MP4 to {filename}") except Exception as exc: log_to_console(f"MP4 export failed: {exc}") finally: if movie_plotter is not None: try: movie_plotter.close() except Exception: pass if temp_path and temp_path.exists(): try: temp_path.unlink() except Exception: pass # --- Qiskit Simulation Functions --- def _map_state_to_qiskit_params(): """ Map qlbm_embedded state variables to qlbm_sample_app parameters. Returns ------- dict or None Dictionary of parameters for run_sampling_sim, or None if state is unavailable """ if _state is None: return None # Map distribution type dist_type = _state.qlbm_dist_type if dist_type == "Sinusoidal": init_state_name = "sin" elif dist_type == "Gaussian": init_state_name = "gaussian" elif dist_type == "Multi-Dirac-Delta": init_state_name = "multi_dirac_delta" else: init_state_name = "sin" # Default # Calculate n from grid_size (grid_size = 2^n) grid_size = int(_state.qlbm_grid_size) n = int(math.log2(grid_size)) if grid_size > 0 else 3 # Map Gaussian parameters from grid units to normalized [0,1] # In the UI, gauss_cx/cy/cz are in grid units (0 to nx) # qlbm_sample_app expects normalized [0,1] nx = float(_state.qlbm_nx) if _state.qlbm_nx else float(grid_size) gauss_cx = float(_state.qlbm_gauss_cx) / nx if nx > 0 else 0.5 gauss_cy = float(_state.qlbm_gauss_cy) / nx if nx > 0 else 0.5 gauss_cz = float(_state.qlbm_gauss_cz) / nx if nx > 0 else 0.5 gauss_sigma = float(_state.qlbm_gauss_sigma) / nx if nx > 0 else 0.2 # Create T_list from time_steps: [1, 2, 3, ..., T] time_steps = int(_state.qlbm_time_steps) if time_steps <= 0: T_list = [1] else: T_list = list(range(1, time_steps + 1)) return { "n": n, "init_state_name": init_state_name, "sine_k_x": float(_state.qlbm_sine_k_x), "sine_k_y": float(_state.qlbm_sine_k_y), "sine_k_z": float(_state.qlbm_sine_k_z), "gauss_cx": gauss_cx, "gauss_cy": gauss_cy, "gauss_cz": gauss_cz, "gauss_sigma": gauss_sigma, "mdd_kx_log2": int(_state.qlbm_mdd_kx_log2) if hasattr(_state, "qlbm_mdd_kx_log2") else 1, "mdd_ky_log2": int(_state.qlbm_mdd_ky_log2) if hasattr(_state, "qlbm_mdd_ky_log2") else 1, "mdd_kz_log2": int(_state.qlbm_mdd_kz_log2) if hasattr(_state, "qlbm_mdd_kz_log2") else 1, "vx_expr": str(_state.qlbm_vx_expr), "vy_expr": str(_state.qlbm_vy_expr), "vz_expr": str(_state.qlbm_vz_expr), "T_list": T_list, "grid_size": grid_size, } def _run_qiskit_simulation(progress_callback=None): """ Run QLBM simulation using Qiskit Aer statevector simulator. Parameters ---------- progress_callback : callable, optional Function to report progress (0-100) Returns ------- output : list[ndarray] List of 3D density arrays, one per timestep fig : go.Figure Plotly figure with slider animation T_list : list[int] List of timesteps """ if not _QISKIT_BACKEND_AVAILABLE: raise RuntimeError(f"Qiskit backend not available: {_QISKIT_IMPORT_ERROR}") params = _map_state_to_qiskit_params() if params is None: raise RuntimeError("Failed to map state parameters") log_to_console(f"Qiskit Simulation Parameters:") log_to_console(f" n={params['n']} (grid {params['grid_size']}³)") log_to_console(f" T_list={params['T_list']}") log_to_console(f" Distribution: {params['init_state_name']}") log_to_console(f" Velocity: vx={params['vx_expr']}, vy={params['vy_expr']}, vz={params['vz_expr']}") if progress_callback: progress_callback(5) # Create initial state circuit using qlbm_sample_app function log_to_console("Creating initial state circuit...") init_state_prep_circ = get_named_init_state_circuit( n=params["n"], init_state_name=params["init_state_name"], sine_k_x=params["sine_k_x"], sine_k_y=params["sine_k_y"], sine_k_z=params["sine_k_z"], gauss_cx=params["gauss_cx"], gauss_cy=params["gauss_cy"], gauss_cz=params["gauss_cz"], gauss_sigma=params["gauss_sigma"], mdd_kx_log2=params["mdd_kx_log2"], mdd_ky_log2=params["mdd_ky_log2"], mdd_kz_log2=params["mdd_kz_log2"], ) if progress_callback: progress_callback(15) log_to_console("Running Qiskit Aer statevector simulation...") log_to_console(f" Processing {len(params['T_list'])} timestep(s)...") # Determine velocity resolution (cap for performance) vel_resolution = min(params['grid_size'], 32) # Define a progress wrapper to map 0-100% simulation progress to 15-95% overall progress def sim_progress_wrapper(p): if progress_callback: # Map 0-100 -> 15-95 weighted_p = 15 + (p / 100.0) * (95 - 15) progress_callback(weighted_p) # Run simulation using qlbm_sample_app function output, fig = run_sampling_sim( n=params["n"], ux=params["vx_expr"], uy=params["vy_expr"], uz=params["vz_expr"], init_state_prep_circ=init_state_prep_circ, T_list=params["T_list"], vel_resolution=vel_resolution, progress_callback=sim_progress_wrapper, ) if progress_callback: progress_callback(95) log_to_console(f"Simulation complete: {len(output)} frame(s) generated") return output, fig, params["T_list"] # --- Job Result Upload Processing --- def process_uploaded_job_result(): """ Process an IBM or IonQ job by retrieving it directly using the Job ID. This function: 1. Takes the Job ID from user input (or extracts from uploaded filename) 2. Connects to IBM/IonQ based on platform selection and retrieves the job 3. Processes the job results (IBM: job.result(), IonQ: job.get_counts(i)) 4. Calls load_samples/estimate_density for each timestep 5. Generates the slider figure using plot_density_isosurface_slider """ global simulation_data_frames, simulation_times, current_grid_object if _state is None: return # Validate required imports if not _VISUALIZE_COUNTS_AVAILABLE: _state.qlbm_job_upload_error = "visualize_counts module not available. Cannot process job results." log_to_console("Error: visualize_counts module not available") return # Get job ID from text field job_id = None if _state.qlbm_job_id and str(_state.qlbm_job_id).strip(): job_id = str(_state.qlbm_job_id).strip() # Remove .json extension if present if job_id.endswith(".json"): job_id = job_id[:-5] log_to_console(f"Using Job ID from text field: {job_id}") if not job_id: _state.qlbm_job_upload_error = "No Job ID provided. Please enter a Job ID." return # Get platform selection platform = _state.qlbm_job_platform or "IonQ" # Reset messages _state.qlbm_job_upload_error = "" _state.qlbm_job_upload_success = "" _state.qlbm_job_is_processing = True log_to_console(f"Processing {platform} Job ID: {job_id}") try: # Parse timesteps from user input try: total_time = int(_state.qlbm_job_total_time or 3) if total_time < 1: total_time = 1 T_list = list(range(1, total_time + 1)) except ValueError: _state.qlbm_job_upload_error = "Invalid Total Time. Please enter a positive integer." _state.qlbm_job_is_processing = False return log_to_console(f"Timesteps to process: {T_list}") # Get processing parameters output_resolution = int(_state.qlbm_job_output_resolution or 40) # Platform-specific parameters if platform == "IBM": flag_qubits = True midcircuit_meas = True # IBM uses midcircuit_meas=True else: # IonQ flag_qubits = True midcircuit_meas = False # IonQ uses midcircuit_meas=False log_to_console(f"Platform: {platform}, Resolution: {output_resolution}, Flag qubits: {flag_qubits}, Midcircuit meas: {midcircuit_meas}") output = [] if platform == "IBM": # === IBM Job Retrieval === log_to_console("Connecting to IBM Quantum...") try: from qiskit_ibm_runtime import QiskitRuntimeService except ImportError: _state.qlbm_job_upload_error = "qiskit_ibm_runtime package not available. Please install it." _state.qlbm_job_is_processing = False log_to_console("Error: qiskit_ibm_runtime not installed") return # Get API token from environment ibm_token = os.environ.get("API_KEY_IBM_QLBM") if not ibm_token: _state.qlbm_job_upload_error = "IBM API token not found. Set API_KEY_IBM_QLBM environment variable." _state.qlbm_job_is_processing = False log_to_console("Error: IBM API token not found in environment") return # Set up IBM service (same as run_sampling_hw_ibm) try: service = QiskitRuntimeService( channel="ibm_cloud", token=ibm_token, instance="crn:v1:bluemix:public:quantum-computing:us-east:a/15157e4350c04a9dab51b8b8a4a93c86:e29afd91-64bf-4a82-8dbf-731e6c213595::", ) log_to_console("Connected to IBM Quantum service") except Exception as e: _state.qlbm_job_upload_error = f"Failed to connect to IBM Quantum: {e}" _state.qlbm_job_is_processing = False log_to_console(f"Error connecting to IBM: {e}") return # Retrieve the job log_to_console(f"Retrieving IBM job: {job_id}") try: job = service.job(job_id) except Exception as e: _state.qlbm_job_upload_error = f"Failed to retrieve IBM job: {e}" _state.qlbm_job_is_processing = False log_to_console(f"Error retrieving job: {e}") return # Check job status try: status = job.status() status_name = status.name if hasattr(status, 'name') else str(status) log_to_console(f"Job status: {status_name}") if status_name not in ('DONE', 'COMPLETED'): _state.qlbm_job_upload_error = f"Job is not complete. Current status: {status_name}" _state.qlbm_job_is_processing = False return except Exception as e: log_to_console(f"Warning: Could not check job status: {e}") # Get results (same as run_sampling_hw_ibm) log_to_console("Retrieving IBM job results...") try: result = job.result() log_to_console("Results retrieved successfully") except Exception as e: _state.qlbm_job_upload_error = f"Failed to get job results: {e}" _state.qlbm_job_is_processing = False log_to_console(f"Error getting results: {e}") return # Process results (same pattern as run_sampling_hw_ibm) log_to_console("Processing IBM job results...") for idx, (T_total, pub) in enumerate(zip(T_list, result)): try: log_to_console(f"Processing timestep T={T_total} (circuit {idx})...") # Get counts (same as run_sampling_hw_ibm) try: joined = pub.join_data() counts = joined.get_counts() except Exception as e: log_to_console(f"Error retrieving counts for T={T_total}: {e}") continue log_to_console(f" Retrieved {len(counts)} unique bitstrings") # Debug: show a few sample bitstrings sample_count = 0 for bs, cnt in counts.items(): if sample_count < 3: log_to_console(f" Sample: {bs} (count={cnt})") sample_count += 1 # Process samples (same as run_sampling_hw_ibm) pts, processed_counts = load_samples( counts, T_total, logger=log_to_console, flag_qubits=flag_qubits, midcircuit_meas=midcircuit_meas ) log_to_console(f" load_samples returned {len(pts)} valid sample points") # Estimate density density = estimate_density(pts, processed_counts, bandwidth=0.05, grid_size=output_resolution) output.append(density) except Exception as e: log_to_console(f"Error processing timestep {idx}: {e}") import traceback log_to_console(traceback.format_exc()) else: # === IonQ Job Retrieval === log_to_console("Connecting to IonQ...") try: from qiskit_ionq import IonQProvider except ImportError: _state.qlbm_job_upload_error = "qiskit_ionq package not available. Please install it." _state.qlbm_job_is_processing = False log_to_console("Error: qiskit_ionq not installed") return # Get API token from environment (same pattern as run_sampling_hw_ionq) ionq_token = os.environ.get("API_KEY_IONQ_QLBM") or os.environ.get("IONQ_API_TOKEN") if not ionq_token: _state.qlbm_job_upload_error = "IonQ API token not found. Set API_KEY_IONQ_QLBM environment variable." _state.qlbm_job_is_processing = False log_to_console("Error: IonQ API token not found in environment") return # Set the IONQ_API_TOKEN env var so IonQProvider() can find it (same as run_sampling_hw_ionq) os.environ.setdefault("IONQ_API_TOKEN", ionq_token) # Set up the IonQ provider and backend (IonQProvider reads from IONQ_API_TOKEN env var) provider = IonQProvider() backend = provider.get_backend("qpu.forte-enterprise-1") backend_name = backend.name if isinstance(backend.name, str) else backend.name() log_to_console(f"Connected to IonQ backend: {backend_name}") # Retrieve the job log_to_console(f"Retrieving IonQ job: {job_id}") try: job = backend.retrieve_job(job_id) except Exception as e: _state.qlbm_job_upload_error = f"Failed to retrieve IonQ job: {e}" _state.qlbm_job_is_processing = False log_to_console(f"Error retrieving job: {e}") return # Check job status try: status = job.status() status_name = status.name if hasattr(status, 'name') else str(status) log_to_console(f"Job status: {status_name}") if status_name not in ('DONE', 'COMPLETED'): _state.qlbm_job_upload_error = f"Job is not complete. Current status: {status_name}" _state.qlbm_job_is_processing = False return except Exception as e: log_to_console(f"Warning: Could not check job status: {e}") # Process results (same as run_sampling_hw_ionq) log_to_console("Processing IonQ job results...") for i, T_total in enumerate(T_list): try: log_to_console(f"Processing timestep T={T_total} (circuit {i})...") # Get counts directly from job (same as run_sampling_hw_ionq) counts = job.get_counts(i) log_to_console(f" Retrieved {len(counts)} unique bitstrings") # Debug: show a few sample bitstrings sample_count = 0 for bs, cnt in counts.items(): if sample_count < 3: log_to_console(f" Sample: {bs} (count={cnt})") sample_count += 1 # Process samples (same as run_sampling_hw_ionq) pts, processed_counts = load_samples( counts, T_total, logger=log_to_console, flag_qubits=flag_qubits, midcircuit_meas=midcircuit_meas ) log_to_console(f" load_samples returned {len(pts)} valid sample points") # Estimate density density = estimate_density(pts, processed_counts, bandwidth=0.05, grid_size=output_resolution) output.append(density) except IndexError: log_to_console(f"Warning: No data found for timestep T={T_total} (circuit {i})") break except Exception as e: log_to_console(f"Error processing timestep {i}: {e}") import traceback log_to_console(traceback.format_exc()) if not output: _state.qlbm_job_upload_error = "No valid data extracted from job. Check timesteps parameter." _state.qlbm_job_is_processing = False return log_to_console(f"Processed {len(output)} timestep(s) successfully") # Generate the Plotly figure fig = plot_density_isosurface_slider(output, T_list[:len(output)]) # Update state to show results _state.qlbm_qiskit_mode = True _state.qlbm_qiskit_fig = fig _state.qlbm_simulation_has_run = True _state.qlbm_job_upload_success = f"✓ Successfully processed {len(output)} timestep(s) from {platform} job {job_id}" # Update the Plotly figure widget if hasattr(_ctrl, "qlbm_qiskit_result_update"): _ctrl.qlbm_qiskit_result_update(fig) log_to_console(f"Results ready! {len(output)} frames generated.") except Exception as e: _state.qlbm_job_upload_error = f"Error processing job: {e}" log_to_console(f"Processing error: {e}") import traceback log_to_console(traceback.format_exc()) finally: _state.qlbm_job_is_processing = False # --- Main Simulation --- def run_simulation(): """ Entry point for simulation - launches the async worker. This is called by the UI button click and schedules the async task. """ if _server is None: log_to_console("Error: Server not available") return # Schedule the async simulation asyncio.ensure_future(_run_simulation_async()) async def _run_simulation_async(): """ Async simulation runner that uses thread pool for blocking work. This allows the UI to update in real-time during simulation. """ global simulation_data_frames, simulation_times, current_grid_object, _plotter, _qlbm_main_loop from concurrent.futures import ThreadPoolExecutor # Capture the main event loop for thread-safe callbacks _qlbm_main_loop = asyncio.get_event_loop() # Create executor for blocking operations executor = ThreadPoolExecutor(max_workers=1) loop = _qlbm_main_loop if not _SIMULATION_CAN_RUN: msg = _SIMULATION_DISABLED_REASON or "Simulation backend is not available on this platform." _state.qlbm_run_error = msg log_to_console(f"Error: {msg}") _state.qlbm_status_message = "Error: Backend unavailable" _state.qlbm_status_type = "error" await _qlbm_flush_async() executor.shutdown(wait=False) return _state.qlbm_is_running = True _state.qlbm_run_error = "" _state.qlbm_simulation_has_run = False _state.qlbm_qiskit_mode = False # Reset Qiskit mode _state.qlbm_show_progress = True _state.qlbm_simulation_progress = 0 _state.qlbm_status_message = "Initializing simulation..." _state.qlbm_status_type = "info" await _qlbm_flush_async() # Start heartbeat for continuous progress updates _qlbm_start_progress_heartbeat() # Determine if using Qiskit backend use_qiskit = ( _state.qlbm_backend_type == "Simulator" and _state.qlbm_selected_simulator == "IBM Qiskit simulator" and _QISKIT_BACKEND_AVAILABLE ) use_ibm_qpu = ( _state.qlbm_backend_type == "QPU" and _state.qlbm_selected_qpu == "IBM QPU" and _QISKIT_BACKEND_AVAILABLE ) use_ionq_qpu = ( _state.qlbm_backend_type == "QPU" and _state.qlbm_selected_qpu == "IonQ QPU" and _QISKIT_BACKEND_AVAILABLE ) # Log initial configuration backend_info = f"{_state.qlbm_backend_type}" if _state.qlbm_backend_type == "Simulator": backend_info += f" - {_state.qlbm_selected_simulator}" elif _state.qlbm_backend_type == "QPU": backend_info += f" - {_state.qlbm_selected_qpu}" log_to_console("Job Initiated") log_to_console(f"Grid Size: {_state.qlbm_grid_size}×{_state.qlbm_grid_size}×{_state.qlbm_grid_size}, Time Steps: {_state.qlbm_time_steps}, Distribution: {_state.qlbm_dist_type}, Boundary: {_state.qlbm_boundary_condition}, Backend: {backend_info}, Velocity: vx={_state.qlbm_vx_expr}, vy={_state.qlbm_vy_expr}, vz={_state.qlbm_vz_expr}") # Progress callback that uses thread-safe flush for real-time updates last_logged_percent = [0] # Use list for nonlocal in nested function def _progress_callback(percent): _state.qlbm_simulation_progress = percent if percent - last_logged_percent[0] >= 10: log_to_console(f"Simulation progress: {int(percent)}%") last_logged_percent[0] = percent _qlbm_flush_state_threadsafe() # Thread-safe flush! # QPU progress callback with status message support def _qpu_progress_callback(percent, message=None): _state.qlbm_simulation_progress = percent if message: _state.qlbm_status_message = message _qlbm_flush_state_threadsafe() try: # === Qiskit Backend (IBM Qiskit Simulator) === if use_qiskit: log_to_console("Using IBM Qiskit Simulator backend...") _state.qlbm_status_message = "Running Qiskit Aer simulation..." await _qlbm_flush_async() # Run Qiskit simulation in executor to keep UI responsive def _run_qiskit_blocking(): return _run_qiskit_simulation(progress_callback=_progress_callback) output, plotly_fig, T_list = await loop.run_in_executor(executor, _run_qiskit_blocking) # Store results simulation_data_frames = output simulation_times = [float(t) for t in T_list] # Update the Plotly figure widget for Qiskit results if hasattr(_ctrl, "qlbm_qiskit_result_update"): _ctrl.qlbm_qiskit_result_update(plotly_fig) _state.qlbm_max_time_step = len(output) - 1 _state.qlbm_time_val = 0 _state.qlbm_time_slider_labels = [f"T={t}" for t in T_list] _state.qlbm_simulation_has_run = True _state.qlbm_qiskit_mode = True # Use Plotly display instead of PyVista _state.qlbm_simulation_progress = 100 log_to_console("Qiskit simulation completed successfully.") _state.qlbm_status_message = "Simulation completed successfully." _state.qlbm_status_type = "success" _state.qlbm_show_progress = False _qlbm_auto_hide_status_window(3.0) # Auto-hide after 3 seconds await _qlbm_flush_async() # === IBM QPU Backend === elif use_ibm_qpu: log_to_console("Using IBM QPU backend...") _state.qlbm_status_message = "Step 1: Preparing IBM QPU job..." _state.qlbm_simulation_progress = 0 await _qlbm_flush_async() params = _map_state_to_qiskit_params() if params is None: raise RuntimeError("Failed to map state parameters") # Create initial state circuit (part of Step 1) log_to_console("Creating initial state circuit...") _state.qlbm_simulation_progress = 2 await _qlbm_flush_async() init_state_prep_circ = get_named_init_state_circuit( n=params["n"], init_state_name=params["init_state_name"], sine_k_x=params["sine_k_x"], sine_k_y=params["sine_k_y"], sine_k_z=params["sine_k_z"], gauss_cx=params["gauss_cx"], gauss_cy=params["gauss_cy"], gauss_cz=params["gauss_cz"], gauss_sigma=params["gauss_sigma"], mdd_kx_log2=params["mdd_kx_log2"], mdd_ky_log2=params["mdd_ky_log2"], mdd_kz_log2=params["mdd_kz_log2"], ) _state.qlbm_simulation_progress = 5 _state.qlbm_status_message = "Step 1: Circuit generation..." await _qlbm_flush_async() # Run HW simulation in executor with progress callback def _run_ibm_qpu_blocking(): job, get_result = run_sampling_hw_ibm( n=params["n"], ux=params["vx_expr"], uy=params["vy_expr"], uz=params["vz_expr"], init_state_prep_circ=init_state_prep_circ, T_list=params["T_list"], shots=2**14, vel_resolution=min(params['grid_size'], 32), output_resolution=min(2*params['grid_size'], 40), logger=log_to_console, progress_callback=_qpu_progress_callback, ) # get_result already handles progress updates internally output, plotly_fig = get_result(job) return output, plotly_fig, init_state_prep_circ output, plotly_fig, init_state_prep_circ = await loop.run_in_executor(executor, _run_ibm_qpu_blocking) # Step 3: Finalizing results (T=0 snapshot removed - only show T=1 onwards) _state.qlbm_simulation_progress = 92 _state.qlbm_status_message = "Step 3: Finalizing results..." await _qlbm_flush_async() # Use T_list directly (no T=0 prepend) result_T_list = list(params["T_list"]) log_to_console(f"Results available for T={result_T_list}") # Store results simulation_data_frames = output simulation_times = [float(t) for t in result_T_list] # Update UI if hasattr(_ctrl, "qlbm_qiskit_result_update"): _ctrl.qlbm_qiskit_result_update(plotly_fig) _state.qlbm_max_time_step = len(output) - 1 _state.qlbm_time_val = 0 _state.qlbm_time_slider_labels = [f"T={t}" for t in result_T_list] _state.qlbm_simulation_has_run = True _state.qlbm_qiskit_mode = True _state.qlbm_simulation_progress = 100 log_to_console("IBM QPU simulation completed successfully.") _state.qlbm_status_message = "Simulation completed successfully." _state.qlbm_status_type = "success" _state.qlbm_show_progress = False _qlbm_auto_hide_status_window(3.0) # Auto-hide after 3 seconds await _qlbm_flush_async() # === IonQ QPU Backend === elif use_ionq_qpu: log_to_console("Using IonQ QPU backend...") _state.qlbm_status_message = "Step 1: Preparing IonQ QPU job..." _state.qlbm_simulation_progress = 0 await _qlbm_flush_async() params = _map_state_to_qiskit_params() if params is None: raise RuntimeError("Failed to map state parameters") # Create initial state circuit (part of Step 1) log_to_console("Creating initial state circuit...") _state.qlbm_simulation_progress = 2 await _qlbm_flush_async() init_state_prep_circ = get_named_init_state_circuit( n=params["n"], init_state_name=params["init_state_name"], sine_k_x=params["sine_k_x"], sine_k_y=params["sine_k_y"], sine_k_z=params["sine_k_z"], gauss_cx=params["gauss_cx"], gauss_cy=params["gauss_cy"], gauss_cz=params["gauss_cz"], gauss_sigma=params["gauss_sigma"], mdd_kx_log2=params["mdd_kx_log2"], mdd_ky_log2=params["mdd_ky_log2"], mdd_kz_log2=params["mdd_kz_log2"], ) _state.qlbm_simulation_progress = 5 _state.qlbm_status_message = "Step 1: Circuit generation..." await _qlbm_flush_async() # Run IonQ HW simulation in executor with progress callback def _run_ionq_qpu_blocking(): job, get_result = run_sampling_hw_ionq( n=params["n"], ux=params["vx_expr"], uy=params["vy_expr"], uz=params["vz_expr"], init_state_prep_circ=init_state_prep_circ, T_list=params["T_list"], shots=2**14, vel_resolution=min(params['grid_size'], 32), output_resolution=min(2*params['grid_size'], 40), logger=log_to_console, progress_callback=_qpu_progress_callback, ) # get_result already handles progress updates internally output, plotly_fig = get_result(job) return output, plotly_fig, init_state_prep_circ output, plotly_fig, init_state_prep_circ = await loop.run_in_executor(executor, _run_ionq_qpu_blocking) # Step 3: Finalizing results (T=0 snapshot removed - only show T=1 onwards) _state.qlbm_simulation_progress = 92 _state.qlbm_status_message = "Step 3: Finalizing results..." await _qlbm_flush_async() # Use T_list directly (no T=0 prepend) result_T_list = list(params["T_list"]) log_to_console(f"Results available for T={result_T_list}") # Store results simulation_data_frames = output simulation_times = [float(t) for t in result_T_list] # Update UI if hasattr(_ctrl, "qlbm_qiskit_result_update"): _ctrl.qlbm_qiskit_result_update(plotly_fig) _state.qlbm_max_time_step = len(output) - 1 _state.qlbm_time_val = 0 _state.qlbm_time_slider_labels = [f"T={t}" for t in result_T_list] _state.qlbm_simulation_has_run = True _state.qlbm_qiskit_mode = True _state.qlbm_simulation_progress = 100 log_to_console("IonQ QPU simulation completed successfully.") _state.qlbm_status_message = "Simulation completed successfully." _state.qlbm_status_type = "success" _state.qlbm_show_progress = False _qlbm_auto_hide_status_window(3.0) # Auto-hide after 3 seconds await _qlbm_flush_async() # === CUDA-Q Backend === elif _state.qlbm_backend_type == "Simulator" and _state.qlbm_selected_simulator == "CUDA-Q simulator": _state.qlbm_qiskit_mode = False # Use PyVista display _state.qlbm_status_message = "Running CUDA-Q simulation..." await _qlbm_flush_async() grid_size = int(_state.qlbm_grid_size) num_reg_qubits = int(math.log2(grid_size)) if grid_size > 0 else 3 T = int(_state.qlbm_time_steps) distribution_type = _state.qlbm_dist_type boundary_condition = _state.qlbm_boundary_condition vx_func = make_velocity_func(_state.qlbm_vx_expr) vy_func = make_velocity_func(_state.qlbm_vy_expr) vz_func = make_velocity_func(_state.qlbm_vz_expr) _state.qlbm_simulation_progress = 5 await _qlbm_flush_async() if simulate_qlbm_3D_and_animate is not None: log_to_console("Running CUDA-Q Simulation...") # Run CUDA-Q simulation in executor def _run_cudaq_blocking(): _plotter.clear() return simulate_qlbm_3D_and_animate( num_reg_qubits=num_reg_qubits, T=T, distribution_type=distribution_type, vx_input=vx_func, vy_input=vy_func, vz_input=vz_func, boundary_condition=boundary_condition, plotter=_plotter, add_slider=False, progress_callback=_progress_callback ) result = await loop.run_in_executor(executor, _run_cudaq_blocking) _, frames, times, grid_obj = result else: # Fallback to CPU demo if CUDA-Q not available log_to_console("CUDA-Q not available, falling back to CPU Demo...") def _run_cpu_demo_blocking(): return _run_cpu_demo_simulation( grid_size=grid_size, T=T, distribution_type=distribution_type or "Sinusoidal", vx_func=vx_func, vy_func=vy_func, vz_func=vz_func, progress_callback=_progress_callback ) frames, times, grid_obj = await loop.run_in_executor(executor, _run_cpu_demo_blocking) _state.qlbm_simulation_progress = 95 await _qlbm_flush_async() # Update plotter with results if grid_obj: _plotter.clear() isosurfaces = grid_obj.contour(isosurfaces=7, scalars="scalars") _plotter.add_mesh(isosurfaces, cmap="turbo", opacity=0.3, show_scalar_bar=True) _plotter.add_axes() _plotter.show_grid() # Store Results if frames and len(frames) > 0: simulation_data_frames = frames simulation_times = times current_grid_object = grid_obj _state.qlbm_max_time_step = len(frames) - 1 _state.qlbm_time_val = 0 _state.qlbm_time_slider_labels = [f"{t:.1f}" for t in times] if times else [str(i) for i in range(len(frames))] _state.qlbm_simulation_has_run = True _ensure_point_picking(on_pick_point) if hasattr(_ctrl, "qlbm_view_update"): _ctrl.qlbm_view_update() log_to_console("Simulation completed successfully.") _state.qlbm_status_message = "Simulation completed successfully." _state.qlbm_status_type = "success" _state.qlbm_simulation_progress = 100 _state.qlbm_show_progress = False _qlbm_auto_hide_status_window(3.0) # Auto-hide after 3 seconds await _qlbm_flush_async() else: _state.qlbm_run_error = "Simulation produced no data." log_to_console("Error: Simulation produced no data.") _state.qlbm_status_message = "Error: No data produced" _state.qlbm_status_type = "error" await _qlbm_flush_async() # === CPU Demo Backend (for QPU or fallback) === else: _state.qlbm_qiskit_mode = False # Use PyVista display _state.qlbm_status_message = "Running CPU Demo simulation..." await _qlbm_flush_async() grid_size = int(_state.qlbm_grid_size) num_reg_qubits = int(math.log2(grid_size)) if grid_size > 0 else 3 T = int(_state.qlbm_time_steps) distribution_type = _state.qlbm_dist_type boundary_condition = _state.qlbm_boundary_condition vx_func = make_velocity_func(_state.qlbm_vx_expr) vy_func = make_velocity_func(_state.qlbm_vy_expr) vz_func = make_velocity_func(_state.qlbm_vz_expr) _progress_callback(0) _state.qlbm_simulation_progress = 5 await _qlbm_flush_async() # CPU Demo Simulation in executor log_to_console("Running CPU Demo Simulation...") def _run_cpu_demo_fallback(): return _run_cpu_demo_simulation( grid_size=grid_size, T=T, distribution_type=distribution_type or "Sinusoidal", vx_func=vx_func, vy_func=vy_func, vz_func=vz_func, progress_callback=_progress_callback ) frames, times, grid_obj = await loop.run_in_executor(executor, _run_cpu_demo_fallback) _state.qlbm_simulation_progress = 95 await _qlbm_flush_async() # Update plotter with results if grid_obj: _plotter.clear() isosurfaces = grid_obj.contour(isosurfaces=7, scalars="scalars") _plotter.add_mesh(isosurfaces, cmap="Blues", opacity=0.3, show_scalar_bar=True) _plotter.add_axes() _plotter.show_grid() # Store Results if frames and len(frames) > 0: simulation_data_frames = frames simulation_times = times current_grid_object = grid_obj _state.qlbm_max_time_step = len(frames) - 1 _state.qlbm_time_val = 0 _state.qlbm_time_slider_labels = [f"{t:.1f}" for t in times] if times else [str(i) for i in range(len(frames))] _state.qlbm_simulation_has_run = True _ensure_point_picking(on_pick_point) if hasattr(_ctrl, "qlbm_view_update"): _ctrl.qlbm_view_update() log_to_console("Simulation completed successfully.") _state.qlbm_status_message = "Simulation completed successfully." _state.qlbm_status_type = "success" _state.qlbm_simulation_progress = 100 _state.qlbm_show_progress = False _qlbm_auto_hide_status_window(3.0) # Auto-hide after 3 seconds await _qlbm_flush_async() else: _state.qlbm_run_error = "Simulation produced no data." log_to_console("Error: Simulation produced no data.") _state.qlbm_status_message = "Error: No data produced" _state.qlbm_status_type = "error" await _qlbm_flush_async() except Exception as e: _state.qlbm_run_error = f"Simulation failed: {str(e)}" log_to_console(f"Simulation Error: {e}") print(f"Simulation Error: {e}") import traceback traceback.print_exc() _state.qlbm_status_message = "Simulation failed" _state.qlbm_status_type = "error" await _qlbm_flush_async() finally: _state.qlbm_is_running = False _qlbm_stop_progress_heartbeat() executor.shutdown(wait=False) if _state.qlbm_status_type != "success": _state.qlbm_show_progress = False await _qlbm_flush_async() def stop_simulation(): """Stop the running simulation.""" if _state is None: return _state.qlbm_is_running = False log_to_console("Simulation stopped by user") def reset_simulation(): """Reset the simulation state.""" global _plotter if _state is None: return _state.qlbm_is_running = False _state.qlbm_run_error = "" _state.qlbm_simulation_has_run = False _state.qlbm_qiskit_mode = False # Reset Qiskit mode _state.qlbm_dist_type = None _state.qlbm_show_edges = False _state.qlbm_problems_selection = None _state.qlbm_geometry_selection = None _state.qlbm_backend_type = None _state.qlbm_advecting_field = None _state.qlbm_show_advect_params = False if _plotter: _plotter.clear() if hasattr(_ctrl, "qlbm_view_update"): _ctrl.qlbm_view_update() _apply_workflow_highlights(_determine_workflow_step()) log_to_console("Simulation reset") def _register_handlers(): """Register state change handlers.""" @_state.change("qlbm_advecting_field") def _on_advect_dropdown_change(qlbm_advecting_field, **_): if qlbm_advecting_field: set_velocity_preset(qlbm_advecting_field) _apply_workflow_highlights(_determine_workflow_step()) @_state.change("qlbm_grid_index") def _on_grid_index_change(qlbm_grid_index, **_): """Map discrete slider index to allowed grid sizes.""" try: if qlbm_grid_index is None: return if isinstance(qlbm_grid_index, (int, float)): idx = int(qlbm_grid_index) idx = max(0, min(idx, len(GRID_SIZES) - 1)) val = GRID_SIZES[idx] if _state.qlbm_grid_size != val: _state.qlbm_grid_size = val fig, info, warn = update_qubit_3D_info(val) _state.qlbm_qubit_grid_info = info _state.qlbm_qubit_warning = warn if hasattr(_ctrl, "qlbm_qubit_plot_update"): _ctrl.qlbm_qubit_plot_update(fig) if _state.qlbm_nx != val: _state.qlbm_nx = val _state.qlbm_gauss_cx = val / 2 _state.qlbm_gauss_cy = val / 2 _state.qlbm_gauss_cz = val / 2 _state.qlbm_show_edges = True update_view() except Exception: pass finally: _apply_workflow_highlights(_determine_workflow_step()) @_state.change("qlbm_problems_selection") def _on_problem_selection_change(qlbm_problems_selection, **_): """Auto-select geometry based on the chosen problem.""" try: if not qlbm_problems_selection: _state.qlbm_geometry_selection = None return if isinstance(qlbm_problems_selection, str): normalized = qlbm_problems_selection.strip() _state.qlbm_geometry_selection = _PROBLEM_GEOMETRY_MAP.get(normalized) else: _state.qlbm_geometry_selection = None except Exception: _state.qlbm_geometry_selection = None finally: _apply_workflow_highlights(_determine_workflow_step()) @_state.change("qlbm_dist_type") def _on_dist_type_change(qlbm_dist_type, **_): if _state.qlbm_show_edges: _state.qlbm_show_edges = False update_view() _apply_workflow_highlights(_determine_workflow_step()) @_state.change("qlbm_show_edges", "qlbm_sine_k_x", "qlbm_sine_k_y", "qlbm_sine_k_z", "qlbm_gauss_cx", "qlbm_gauss_cy", "qlbm_gauss_cz", "qlbm_gauss_sigma", "qlbm_mdd_kx_log2", "qlbm_mdd_ky_log2", "qlbm_mdd_kz_log2") def on_param_change(**kwargs): update_view() _apply_workflow_highlights(_determine_workflow_step()) @_state.change("qlbm_geometry_selection", "qlbm_domain_L", "qlbm_domain_W", "qlbm_domain_H") def _on_geometry_selection_change(**_): update_geometry_view() _apply_workflow_highlights(_determine_workflow_step()) @_state.change("qlbm_backend_type") def _on_backend_type_change(**_): _apply_workflow_highlights(_determine_workflow_step()) @_state.change("qlbm_time_val") def update_time_frame(qlbm_time_val, **_): """Update the plotter with the frame corresponding to time_val.""" global simulation_data_frames, simulation_times, current_grid_object, _plotter if not _state.qlbm_simulation_has_run or not simulation_data_frames or current_grid_object is None: return try: idx = int(qlbm_time_val) if 0 <= idx < len(simulation_data_frames): current_grid_object["scalars"] = simulation_data_frames[idx].flatten() isosurfaces = current_grid_object.contour(isosurfaces=7, scalars="scalars") _plotter.clear() _plotter.add_mesh(isosurfaces, cmap="Blues", opacity=0.3, show_scalar_bar=True) _plotter.add_axes() _plotter.show_grid() t_val = simulation_times[idx] if idx < len(simulation_times) else idx _state.qlbm_current_time_label = f"{t_val:.2f}" if isinstance(t_val, float) else str(t_val) _plotter.add_text(f"Time: {t_val:.2f}" if isinstance(t_val, float) else f"Time: {t_val}", name="time_label", position="upper_right") _ensure_point_picking(on_pick_point) if hasattr(_ctrl, "qlbm_view_update"): _ctrl.qlbm_view_update() except Exception as e: print(f"Error updating time frame: {e}") def _build_control_panels(plotter): """Build the left control panel cards.""" # Overview card with vuetify3.VCard(classes="mb-2", style=("qlbm_overview_card_style", _WORKFLOW_BASE_STYLE)): vuetify3.VCardTitle("Overview", classes="text-subtitle-2 font-weight-bold text-primary") with vuetify3.VCardText(): vuetify3.VDivider(classes="my-2") vuetify3.VCardSubtitle("Problems", classes="text-caption font-weight-bold mt-2") with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VSelect( v_bind="props", key="qlbm_overview_problems", label="Select a problem", v_model=("qlbm_problems_selection", None), items=( "qlbm_qlbm_problems", [ "Scalar advection-diffusion in a box", "Laminar flow & heat transfer for a heated body in water.", ], ), placeholder="Select", density="compact", hide_details=True, color="primary", classes="mb-2" ) html.Span("Select a predefined fluid dynamics problem to solve") # Geometry card with vuetify3.VCard(classes="mb-2"): vuetify3.VCardTitle("Geometry", classes="text-subtitle-2 font-weight-bold text-primary") with vuetify3.VCardText(): vuetify3.VAlert( v_if="qlbm_geometry_selection", type="info", variant="tonal", density="compact", color="primary", children=["Selected Geometry: ", "{{ qlbm_geometry_selection }}"], classes="mb-2" ) vuetify3.VAlert( v_if="!qlbm_geometry_selection", type="info", variant="tonal", density="compact", color="primary", children=["No geometry selected. Choose a problem to auto-set."], classes="mb-2" ) with vuetify3.VContainer(v_if="qlbm_geometry_selection === 'Rectangular domain with a heated box (3D)'", classes="pa-0 mt-2"): vuetify3.VCardSubtitle("Domain dimensions", classes="text-caption font-weight-bold mb-2") with vuetify3.VRow(dense=True): with vuetify3.VCol(): with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VTextField(v_bind="props", label="Length (L)", v_model=("qlbm_domain_L", 1.0), type="number", step="0.1", density="compact", hide_details=True, color="primary") html.Span("Length of the domain along X axis") with vuetify3.VCol(): with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VTextField(v_bind="props", label="Width (W)", v_model=("qlbm_domain_W", 1.0), type="number", step="0.1", density="compact", hide_details=True, color="primary") html.Span("Width of the domain along Y axis") with vuetify3.VCol(): with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VTextField(v_bind="props", label="Height (H)", v_model=("qlbm_domain_H", 1.0), type="number", step="0.1", density="compact", hide_details=True, color="primary") html.Span("Height of the domain along Z axis") # Initial Distribution card with vuetify3.VCard(classes="mb-2", style=("qlbm_distribution_card_style", _WORKFLOW_BASE_STYLE)): vuetify3.VCardTitle("Initial Distribution", classes="text-subtitle-2 font-weight-bold text-primary") with vuetify3.VCardText(): with vuetify3.VRow(classes="d-flex align-center mb-2", no_gutters=True): with vuetify3.VCol(cols="auto", classes="flex-grow-1"): with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VSelect( v_bind="props", label="Initial Distribution", v_model=("qlbm_dist_type", None), items=("qlbm_dist_modes",), density="compact", hide_details=True ) html.Span("Select the initial density distribution function") with vuetify3.VCol(cols="auto", classes="ml-2"): with vuetify3.VBtn( icon=True, density="compact", variant="text", click="qlbm_custom_dist_params = !qlbm_custom_dist_params" ): vuetify3.VIcon("mdi-cog", color=("qlbm_custom_dist_params ? 'primary' : 'grey'",)) # Sinusoidal controls with vuetify3.VCard(classes="mb-2", v_if="qlbm_custom_dist_params && qlbm_dist_type === 'Sinusoidal'"): vuetify3.VCardTitle("Sinusoidal Frequencies") with vuetify3.VCardText(): for axis in ['x', 'y', 'z']: with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VSlider( v_bind="props", label=f"Freq {axis.upper()}", v_model=(f"qlbm_sine_k_{axis}", 1.0), min=1, max=5, step=1, thumb_label="always", density="compact" ) html.Span(f"Frequency multiplier for {axis.upper()} axis") # Gaussian controls with vuetify3.VCard(classes="mb-2", v_if="qlbm_custom_dist_params && qlbm_dist_type === 'Gaussian'"): vuetify3.VCardTitle("Gaussian Parameters") with vuetify3.VCardText(): for axis in ['x', 'y', 'z']: with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VSlider( v_bind="props", label=f"Center {axis.upper()}", v_model=(f"qlbm_gauss_c{axis}", 16), min=0, max=("qlbm_nx", 32), step=1, thumb_label="always", density="compact" ) html.Span(f"Center position along {axis.upper()} axis") with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VSlider( v_bind="props", label="Width (Sigma)", v_model=("qlbm_gauss_sigma", 6.0), min=1.0, max=20.0, step=0.5, thumb_label="always", density="compact" ) html.Span("Standard deviation (spread) of the Gaussian") # Multi-Dirac-Delta controls with vuetify3.VCard(classes="mb-2", v_if="qlbm_custom_dist_params && qlbm_dist_type === 'Multi-Dirac-Delta'"): vuetify3.VCardTitle("Multi-Dirac-Delta Parameters") with vuetify3.VCardText(): vuetify3.VCardSubtitle("Number of delta peaks per axis = 2^k", classes="text-caption mb-2") for axis in ['x', 'y', 'z']: with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VSlider( v_bind="props", label=f"k_{axis.upper()} (log₂)", v_model=(f"qlbm_mdd_k{axis}_log2", 1), min=1, max=4, step=1, thumb_label="always", density="compact" ) html.Span(f"Log2 of number of peaks along {axis.upper()}") # Boundary Conditions with vuetify3.VCard(classes="mb-2"): vuetify3.VCardTitle("Boundary Conditions", classes="text-subtitle-2 font-weight-bold text-primary") with vuetify3.VCardText(): with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VSelect(v_bind="props", label="Boundary Condition", v_model=("qlbm_boundary_condition", "Periodic"), items=("['Periodic']",), density="compact", hide_details=True, color="primary") html.Span("Select boundary conditions for the simulation domain") # Advecting Fields with vuetify3.VCard(classes="mb-2", style=("qlbm_advect_card_style", _WORKFLOW_BASE_STYLE)): vuetify3.VCardTitle("Advecting Fields", classes="text-subtitle-2 font-weight-bold text-primary") with vuetify3.VCardText(): with vuetify3.VRow(classes="d-flex align-center mb-2", no_gutters=True): with vuetify3.VCol(cols="auto", classes="flex-grow-1"): with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VSelect( v_bind="props", label="Select Advecting field", v_model=("qlbm_advecting_field", None), items=("['Uniform', 'Swirl', 'Shear', 'TGV']",), density="compact", hide_details=True, color="primary", placeholder="Select", ) html.Span("Select the velocity field that transports the fluid") with vuetify3.VCol(cols="auto", classes="ml-2"): with vuetify3.VBtn( icon=True, density="compact", variant="text", click="qlbm_show_advect_params = !qlbm_show_advect_params" ): vuetify3.VIcon("mdi-cog", color=("qlbm_show_advect_params ? 'primary' : 'grey'",)) with vuetify3.VContainer(v_if="qlbm_show_advect_params", classes="pa-0 mt-2"): html.Div("Velocity Components", classes="text-caption mb-1") with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VTextField(v_bind="props", label="Velocity vx", v_model=("qlbm_vx_expr", "0.2"), density="compact", hide_details=True, color="primary", classes="mb-1") html.Span("X-component of velocity field") with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VTextField(v_bind="props", label="Velocity vy", v_model=("qlbm_vy_expr", "-0.15"), density="compact", hide_details=True, color="primary", classes="mb-1") html.Span("Y-component of velocity field") with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VTextField(v_bind="props", label="Velocity vz", v_model=("qlbm_vz_expr", "0.3"), density="compact", hide_details=True, color="primary") html.Span("Z-component of velocity field") # Meshing with vuetify3.VCard(classes="mb-2", style=("qlbm_meshing_card_style", _WORKFLOW_BASE_STYLE)): vuetify3.VCardTitle("Meshing", classes="text-subtitle-2 font-weight-bold text-primary") with vuetify3.VCardText(): with vuetify3.VMenu(open_on_hover=True, close_on_content_click=False, location="end"): with vuetify3.Template(v_slot_activator="{ props }"): with vuetify3.VSlider( v_bind="props", label="Number of Points / Direction", v_model=("qlbm_grid_index", 2), min=0, max=5, step=1, thumb_label="always", show_ticks="always", color="primary", density="compact", hide_details=True ): vuetify3.Template(v_slot_thumb_label="{ modelValue }", children=["{{ ['8','16','32','64','128','256'][modelValue] }}"]) with vuetify3.VSheet(classes="pa-2", elevation=6, rounded=True, style="width: 700px;"): with vuetify3.VContainer(fluid=True, classes="pa-0"): qubit_fig = plotly_widgets.Figure(figure=go.Figure(), style="width: 616px; height: 320px; min-height: 320px;", responsive=True) _ctrl.qlbm_qubit_plot_update = qubit_fig.update html.Div("{{ qlbm_qubit_grid_info }}", classes="mt-2 text-caption") html.Div("{{ qlbm_qubit_warning }}", classes="warn-text") vuetify3.VAlert(v_if="qlbm_grid_size > 32", type="warning", variant="tonal", density="compact", children=["Warning: High grid size may impact performance."], classes="mt-2") # Time with vuetify3.VCard(classes="mb-2"): vuetify3.VCardTitle("Time", classes="text-subtitle-2 font-weight-bold text-primary") with vuetify3.VCardText(): with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VSlider(v_bind="props", label="Total Time", v_model=("qlbm_time_steps", 10), min=0, max=30, step=1, thumb_label="always", show_ticks="always", color="primary", density="compact", hide_details=True) html.Span("Number of time steps to simulate") vuetify3.VAlert(v_if="qlbm_time_steps > 100", type="warning", variant="tonal", density="compact", children=["Warning: High time steps may increase runtime."], classes="mt-2") # Backends with vuetify3.VCard(classes="mb-2", style=("qlbm_backend_card_style", _WORKFLOW_BASE_STYLE)): vuetify3.VCardTitle("Backends", classes="text-subtitle-2 font-weight-bold text-primary") with vuetify3.VCardText(): with vuetify3.VRow(dense=True, classes="mb-2"): with vuetify3.VCol(): vuetify3.VAlert( type="info", color="primary", variant="tonal", density="compact", children=[ "Selected: ", "{{ qlbm_backend_type || 'None - Please select a backend' }}", " - ", "{{ qlbm_backend_type === 'Simulator' ? (qlbm_selected_simulator || '—') : (qlbm_backend_type === 'QPU' ? (qlbm_selected_qpu || '—') : '—') }}", ], ) with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VBtn(v_bind="props", text="Choose Backend", color="primary", variant="tonal", block=True) with vuetify3.VList(density="compact"): with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end", offset=8): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VListItem(v_bind="props", title="Simulator", prepend_icon="mdi-robot-outline", append_icon="mdi-chevron-right") with vuetify3.VList(density="compact"): vuetify3.VListItem(title="CUDA-Q simulator", click="qlbm_backend_type = 'Simulator'; qlbm_selected_simulator = 'CUDA-Q simulator'") vuetify3.VListItem(title="IBM Qiskit simulator", click="qlbm_backend_type = 'Simulator'; qlbm_selected_simulator = 'IBM Qiskit simulator'") with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end", offset=8): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VListItem(v_bind="props", title="QPU", prepend_icon="mdi-chip", append_icon="mdi-chevron-right") with vuetify3.VList(density="compact"): vuetify3.VListItem(title="IBM QPU", click="qlbm_backend_type = 'QPU'; qlbm_selected_qpu = 'IBM QPU'") vuetify3.VListItem(title="IonQ QPU", click="qlbm_backend_type = 'QPU'; qlbm_selected_qpu = 'IonQ QPU'") # IBM QPU Warning for grid > 16 vuetify3.VAlert( v_if="qlbm_backend_type === 'QPU' && qlbm_selected_qpu === 'IBM QPU' && qlbm_grid_size > 16", type="warning", variant="tonal", density="compact", children=["⚠️ Grid size > 16 may exceed IBM QPU capacity!"], classes="mt-2" ) # Sinusoidal Warning for IBM QPU vuetify3.VAlert( v_if="qlbm_backend_type === 'QPU' && qlbm_selected_qpu === 'IBM QPU' && qlbm_dist_type === 'Sinusoidal'", type="warning", variant="tonal", density="compact", children=["⚠️ Sinusoidal distribution results in very high circuit depth on IBM QPU!"], classes="mt-2" ) # IonQ Restriction Warning vuetify3.VAlert( v_if="qlbm_backend_type === 'QPU' && qlbm_selected_qpu === 'IonQ QPU' && qlbm_dist_type !== 'Multi-Dirac-Delta'", type="error", variant="tonal", density="compact", children=["⚠️ IonQ QPU only supports Multi-Dirac-Delta distribution."], classes="mt-2" ) vuetify3.VDivider(classes="my-3") vuetify3.VBtn( text="Run", color="primary", block=True, disabled=("qlbm_is_running || !qlbm_backend_type || (qlbm_backend_type === 'QPU' && qlbm_selected_qpu === 'IonQ QPU' && qlbm_dist_type !== 'Multi-Dirac-Delta')", True), click=run_simulation, style=("qlbm_is_running ? '' : 'background-color:#87CEFA;'", ""), ) # Backend mode and notes hidden as per user request # html.Div("Backend: {{ qlbm_simulation_backend_mode }}", classes="text-caption text-medium-emphasis mt-2") # vuetify3.VAlert( # v_if="qlbm_simulation_backend_note", # type="info", # variant="tonal", # density="compact", # children=["{{ qlbm_simulation_backend_note }}"], # classes="mt-2", # ) with vuetify3.VRow(dense=True, classes="mt-2"): with vuetify3.VCol(cols=6): vuetify3.VBtn( text="Reset", color="#8BC34A", variant="tonal", block=True, disabled=("qlbm_is_running", False), click=reset_simulation, ) with vuetify3.VCol(cols=6): vuetify3.VBtn( text="STOP", color="#FF7043", variant="tonal", block=True, click=stop_simulation, disabled=("!qlbm_is_running", True), ) # --- Job Result Upload Section --- vuetify3.VDivider(classes="my-3") html.Div("Upload Results", classes="text-subtitle-2 font-weight-bold text-primary mb-2") html.Div("Retrieve completed job results from IBM or IonQ using the Job ID", classes="text-caption text-medium-emphasis mb-2") # Platform selector with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VSelect( v_bind="props", label="Platform", v_model=("qlbm_job_platform", "IonQ"), items=("['IBM', 'IonQ']",), density="compact", hide_details=True, color="primary", classes="mb-2", prepend_icon="mdi-chip", ) html.Span("Select the quantum hardware provider (IBM or IonQ)") # Job ID input with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VTextField( v_bind="props", label="Job ID", v_model=("qlbm_job_id", ""), density="compact", hide_details=True, color="primary", classes="mb-2", placeholder="e.g., 019b368e-6e22-7525-8512-fd16e0503673", prepend_icon="mdi-identifier", ) html.Span("Enter the Job ID (UUID format from IBM or IonQ)") # Output resolution and Total Time in a row with vuetify3.VRow(dense=True, classes="mb-2"): with vuetify3.VCol(cols=6): with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VTextField( v_bind="props", label="Total Time", v_model=("qlbm_job_total_time", 3), type="number", density="compact", hide_details=True, color="primary", ) html.Span("Total number of time steps (T) used when running the job") with vuetify3.VCol(cols=6): with vuetify3.VTooltip(location="top"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VTextField( v_bind="props", label="Output Resolution", v_model=("qlbm_job_output_resolution", 40), type="number", density="compact", hide_details=True, color="primary", ) html.Span("Resolution for 3D visualization. Should be <= Grid Size (2^n).") # Generate button vuetify3.VBtn( text="Retrieve & Generate Plot", color="secondary", variant="tonal", block=True, disabled=("!qlbm_job_id || qlbm_job_is_processing", True), loading=("qlbm_job_is_processing", False), click=process_uploaded_job_result, prepend_icon="mdi-chart-box-outline", classes="mb-2", ) # Success message vuetify3.VAlert( v_if="qlbm_job_upload_success", type="success", variant="tonal", density="compact", closable=True, children=["{{ qlbm_job_upload_success }}"], classes="mt-2", ) # Error message vuetify3.VAlert( v_if="qlbm_job_upload_error", type="error", variant="tonal", density="compact", closable=True, children=["{{ qlbm_job_upload_error }}"], classes="mt-2", ) def _build_visualization_panel(plotter): """Build the right visualization panel.""" # Main Plot Card with vuetify3.VCard(classes="mb-1 flex-grow-1 d-flex flex-column", elevation=2, style="min-height: 0;"): # Geometry Preview (Plotly) - when no simulation and no distribution selected with vuetify3.VContainer(v_if="!qlbm_simulation_has_run && !qlbm_dist_type && qlbm_geometry_selection", fluid=True, classes="pa-0 flex-grow-1", style="width: 100%; height: 100%;"): geom_fig = plotly_widgets.Figure(figure=go.Figure(), style="width: 100%; height: 100%;", responsive=True) _ctrl.qlbm_geometry_plot_update = geom_fig.update # Distribution Preview (Plotly) - when distribution selected but no simulation with vuetify3.VContainer(v_if="!qlbm_simulation_has_run && qlbm_dist_type", fluid=True, classes="pa-0 flex-grow-1", style="width: 100%; height: 100%;"): preview_fig = plotly_widgets.Figure(figure=go.Figure(), style="width:100%; height:100%;", responsive=True) _ctrl.qlbm_preview_update = preview_fig.update # Download controls (for both modes) with vuetify3.VContainer(v_if="qlbm_simulation_has_run", classes="px-4 pt-3 pb-1 d-flex justify-end", style="width: 100%; flex: 0 0 auto;"): with vuetify3.VMenu(location="bottom end"): with vuetify3.Template(v_slot_activator="{ props }"): vuetify3.VBtn( v_bind="props", text="Download", color="primary", variant="tonal", prepend_icon="mdi-download" ) with vuetify3.VList(density="compact"): # VTK and MP4 exports only for non-Qiskit mode vuetify3.VListItem( v_if="!qlbm_qiskit_mode", title="Export as VTK", prepend_icon="mdi-content-save", click=export_simulation_vtk ) vuetify3.VListItem( v_if="!qlbm_qiskit_mode", title="Export as MP4", prepend_icon="mdi-movie", click=export_simulation_mp4 ) # TODO: Add Plotly HTML export for Qiskit mode vuetify3.VListItem( v_if="qlbm_qiskit_mode", title="Export as HTML (Plotly)", prepend_icon="mdi-language-html5", disabled=True, # Not yet implemented ) # === Qiskit Simulation Result (Plotly with built-in slider) === with vuetify3.VContainer(v_if="qlbm_simulation_has_run && qlbm_qiskit_mode", fluid=True, classes="pa-0 flex-grow-1", style="width: 100%; height: 100%;"): qiskit_fig = plotly_widgets.Figure( figure=go.Figure(), style="width:100%; height:100%;", responsive=True ) _ctrl.qlbm_qiskit_result_update = qiskit_fig.update # === PyVista Simulation Result (for CUDA-Q/CPU demo) === with vuetify3.VContainer(v_if="qlbm_simulation_has_run && !qlbm_qiskit_mode", fluid=True, classes="pa-0 flex-grow-1", style="width: 100%; height: 100%;"): view = plotter_ui(plotter) _ctrl.qlbm_view_update = view.update # Time Slider (only for non-Qiskit mode - Qiskit Plotly has built-in slider) with vuetify3.VContainer(v_if="qlbm_simulation_has_run && !qlbm_qiskit_mode", classes="px-4 pb-4", style="width: 90%; flex: 0 0 auto;"): with vuetify3.VSlider( v_model=("qlbm_time_val", 0), min=0, max=("qlbm_max_time_step", 10), step=1, label="Time", thumb_label="always", density="compact", hide_details=True, color="primary" ): vuetify3.Template( v_slot_thumb_label="{ modelValue }", children=["{{ qlbm_time_slider_labels[modelValue] || modelValue }}"] ) # Console Window with vuetify3.VCard(classes="mt-1", style="font-size: 0.8rem; flex: 0 0 auto;"): vuetify3.VCardTitle("Status", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;") with vuetify3.VCardText(classes="py-1 px-2", style="height: 150px; overflow-y: auto; background-color: #f5f5f5; font-family: monospace;"): vuetify3.VTextarea( v_model=("qlbm_console_output", ""), readonly=True, auto_grow=False, rows=6, variant="plain", hide_details=True, style="font-family: monospace; width: 100%; height: 100%;" ) def _build_status_window(): """Build the floating status window.""" with vuetify3.VCard( v_if="qlbm_status_visible", style="position: fixed; bottom: 16px; right: 16px; z-index: 1000; min-width: 320px; max-width: 450px;", elevation=8 ): with vuetify3.VCardTitle(classes="d-flex align-center", style="font-size: 0.95rem; padding: 8px 12px;"): vuetify3.VIcon("mdi-information-outline", size="small", classes="mr-2") html.Span("Simulation Status") vuetify3.VSpacer() vuetify3.VBtn( icon="mdi-close", size="x-small", variant="text", click="qlbm_status_visible = false" ) vuetify3.VDivider() with vuetify3.VCardText(classes="py-2 px-3"): vuetify3.VAlert( type=("qlbm_status_type", "info"), variant="tonal", density="compact", children=["{{ qlbm_status_message }}"] ) with vuetify3.VContainer(v_if="qlbm_show_progress", classes="pa-0 mt-2"): vuetify3.VProgressLinear( model_value=("qlbm_simulation_progress", 0), color="primary", height=6, striped=True ) html.Div( "{{ qlbm_simulation_progress }}% complete", classes="text-caption text-center mt-1", style="font-size: 0.75rem;" )