quantum / qlbm_embedded.py
harishaseebat92
QLBM :IBM Jobs and Removed JSON file
b112afa
"""
QLBM Embedded Mode Module
This module provides functions to build the QLBM UI into an existing Trame server,
enabling single-server architecture for the unified app.
Contains ALL features from qlbm.py but designed for embedded use.
"""
import os
import numpy as np
import math
import pyvista as pv
import plotly.graph_objects as go
import tempfile
import base64
import json
import asyncio
import threading
import time as time_module
from pathlib import Path
from datetime import datetime
from trame_vuetify.widgets import vuetify3
from trame.widgets import html
from trame_plotly.widgets import plotly as plotly_widgets
from pyvista.trame.ui import plotter_ui
# Set offscreen before pyvista usage
pv.OFF_SCREEN = True
# --- Qiskit Backend Detection ---
_QISKIT_BACKEND_AVAILABLE = False
_QISKIT_IMPORT_ERROR = None
_VISUALIZE_COUNTS_AVAILABLE = False
try:
from qlbm.qlbm_sample_app import (
run_sampling_sim,
run_sampling_hw_ibm,
run_sampling_hw_ionq,
get_named_init_state_circuit,
str_to_lambda,
_create_slider_figure,
show_initial_distribution,
)
_QISKIT_BACKEND_AVAILABLE = True
except ImportError as e:
_QISKIT_IMPORT_ERROR = str(e)
print(f"Qiskit backend not available: {e}")
# Import visualize_counts for job result processing
try:
from qlbm.visualize_counts import (
load_samples,
estimate_density,
plot_density_isosurface_slider,
)
_VISUALIZE_COUNTS_AVAILABLE = True
except ImportError as e:
print(f"visualize_counts not available: {e}")
load_samples = None
estimate_density = None
plot_density_isosurface_slider = None
# --- CUDA-Q Backend Detection ---
def _env_flag(name: str) -> bool:
return os.environ.get(name, "").strip().lower() in ("1", "true", "yes")
def _should_disable_quantum_backend() -> str | None:
"""Return a reason string if quantum backend should be disabled, else None."""
if _env_flag("FORCE_CPU_DEMO"):
return "FORCE_CPU_DEMO environment variable is set"
if _env_flag("HUGGINGFACE_SPACE") or os.environ.get("SPACE_ID"):
return "Hugging Face Spaces detected (no GPU runtime)"
return None
_disable_reason = _should_disable_quantum_backend()
simulate_qlbm_3D_and_animate = None
if _disable_reason:
_SIMULATION_BACKEND_READY = False
_SIMULATION_BACKEND_NOTE = f"CPU demo mode active ({_disable_reason}). Results are approximate."
_SIMULATION_MODE_LABEL = "CPU demo backend"
_SIMULATION_DISABLED_REASON = _disable_reason
else:
try:
from qlbm.fluid3d_pyvista import simulate_qlbm_3D_and_animate
_SIMULATION_BACKEND_READY = True
_SIMULATION_BACKEND_NOTE = ""
_SIMULATION_MODE_LABEL = "Quantum CUDA-Q backend"
_SIMULATION_DISABLED_REASON = None
except Exception as exc:
simulate_qlbm_3D_and_animate = None
_SIMULATION_BACKEND_READY = False
_SIMULATION_BACKEND_NOTE = f"CPU demo mode active (import error: {exc}). Results are approximate."
_SIMULATION_MODE_LABEL = "CPU demo backend"
_SIMULATION_DISABLED_REASON = str(exc)
_SIMULATION_CAN_RUN = True # CPU demo is always available
_CPU_DEMO_MAX_GRID = 48
# Module-level state
_server = None
_state = None
_ctrl = None
_plotter = None
_initialized = False
# Global simulation data
simulation_data_frames = []
simulation_times = []
current_grid_object = None
# --- Async infrastructure for real-time progress updates ---
_qlbm_main_loop = None # Reference to main event loop for thread-safe callbacks
_qlbm_heartbeat_thread = None
_qlbm_heartbeat_on = False
_qlbm_sim_start_time = None
def _qlbm_flush_state():
"""Force state flush to browser (synchronous, for main thread use)."""
try:
if _server:
_server.state.flush()
except Exception:
pass
def _qlbm_flush_state_threadsafe():
"""
Thread-safe state flush - schedules flush on the main event loop.
Use this from background threads (e.g., inside executor callbacks).
"""
global _qlbm_main_loop
try:
if _server and _qlbm_main_loop is not None and _qlbm_main_loop.is_running():
# Schedule the flush on the main event loop
_qlbm_main_loop.call_soon_threadsafe(_server.state.flush)
elif _server:
# Fallback: direct flush (may not work from threads)
_server.state.flush()
except Exception:
pass
async def _qlbm_flush_async():
"""Async helper to flush state and yield to event loop."""
_qlbm_flush_state()
await asyncio.sleep(0) # Yield control to event loop
def _qlbm_start_progress_heartbeat():
"""Start background thread for continuous progress updates."""
global _qlbm_heartbeat_thread, _qlbm_heartbeat_on, _qlbm_sim_start_time
if _qlbm_heartbeat_thread and _qlbm_heartbeat_thread.is_alive():
return
_qlbm_sim_start_time = time_module.time()
def loop_fn():
global _qlbm_heartbeat_on
while _qlbm_heartbeat_on:
if _state is not None and _state.qlbm_is_running and _qlbm_sim_start_time is not None:
elapsed = time_module.time() - _qlbm_sim_start_time
# Optionally update elapsed time state here if needed
_qlbm_flush_state_threadsafe()
time_module.sleep(0.1) # Update every 100ms
_qlbm_heartbeat_on = True
_qlbm_heartbeat_thread = threading.Thread(target=loop_fn, daemon=True)
_qlbm_heartbeat_thread.start()
def _qlbm_stop_progress_heartbeat():
"""Stop the background heartbeat thread."""
global _qlbm_heartbeat_on, _qlbm_heartbeat_thread
_qlbm_heartbeat_on = False
_qlbm_heartbeat_thread = None
def _qlbm_auto_hide_status_window(delay_seconds=3.0):
"""
Schedule the QLBM status window to auto-hide after a delay.
Shows the completion message briefly then closes automatically.
"""
def _hide_after_delay():
time_module.sleep(delay_seconds)
if _state is not None:
_state.qlbm_status_visible = False
_qlbm_flush_state_threadsafe()
hide_thread = threading.Thread(target=_hide_after_delay, daemon=True)
hide_thread.start()
GRID_SIZES = [8, 16, 32, 64, 128, 256]
_WORKFLOW_BASE_STYLE = "font-size: 0.8rem; border: 1px solid transparent; transition: box-shadow 0.2s ease;"
_WORKFLOW_HIGHLIGHT_STYLE = "font-size: 0.8rem; box-shadow: 0 0 0 2px #6200ea;"
_WORKFLOW_CARD_KEYS = ["overview_card_style", "distribution_card_style", "advect_card_style", "meshing_card_style", "backend_card_style"]
_PROBLEM_GEOMETRY_MAP = {
"Scalar advection-diffusion in a box": "Cube",
"Laminar flow & heat transfer for a heated body in water.": "Rectangular domain with a heated box (3D)",
}
_QLBM_PREVIEW_COLORSCALE = "Turbo"
def set_server(server):
"""Set the server for embedded mode."""
global _server, _state, _ctrl
_server = server
_state = server.state
_ctrl = server.controller
def init_state():
"""Initialize QLBM state variables with all features from qlbm.py."""
global _initialized
if _initialized or _state is None:
return
_state.update({
# Console & Status
"qlbm_console_output": "QLBM Console initialized.\n",
"qlbm_status_visible": True,
"qlbm_status_message": "Ready",
"qlbm_status_type": "info",
"qlbm_simulation_progress": 0,
"qlbm_show_progress": False,
# Distribution
"qlbm_dist_modes": ["Sinusoidal", "Gaussian", "Multi-Dirac-Delta"],
"qlbm_dist_type": None,
"qlbm_nx": 32,
"qlbm_show_edges": False,
"qlbm_custom_dist_params": False,
# Sinusoidal params
"qlbm_sine_k_x": 1.0,
"qlbm_sine_k_y": 1.0,
"qlbm_sine_k_z": 1.0,
# Gaussian params
"qlbm_gauss_cx": 16.0,
"qlbm_gauss_cy": 16.0,
"qlbm_gauss_cz": 16.0,
"qlbm_gauss_sigma": 6.0,
# Multi-Dirac-Delta params (log2 of frequency multipliers)
"qlbm_mdd_kx_log2": 1,
"qlbm_mdd_ky_log2": 1,
"qlbm_mdd_kz_log2": 1,
# Problem & Geometry
"qlbm_qlbm_problems": [
"Scalar advection-diffusion in a box",
"Laminar flow & heat transfer for a heated body in water.",
],
"qlbm_problems_selection": None,
"qlbm_geometry_selection": None,
"qlbm_domain_L": 1.0,
"qlbm_domain_W": 1.0,
"qlbm_domain_H": 1.0,
# Boundary conditions
"qlbm_boundary_condition": "Periodic",
# Advecting fields
"qlbm_advecting_field": None,
"qlbm_show_advect_params": False,
"qlbm_vx_expr": "0.2",
"qlbm_vy_expr": "-0.15",
"qlbm_vz_expr": "0.3",
# Meshing
"qlbm_grid_index": 2, # Index into GRID_SIZES
"qlbm_grid_size": 32,
"qlbm_time_steps": 10,
# Backend
"qlbm_backend_type": None,
"qlbm_selected_simulator": None,
"qlbm_selected_qpu": None,
# Simulation state
"qlbm_is_running": False,
"qlbm_run_error": "",
"qlbm_simulation_has_run": False,
"qlbm_time_val": 0,
"qlbm_max_time_step": 0,
"qlbm_time_slider_labels": [],
"qlbm_current_time_label": "0.0",
# Qubit info
"qlbm_qubit_grid_info": "Grid Size: 32 × 32 × 32",
"qlbm_qubit_warning": "",
# Backend info
"qlbm_simulation_backend_ready": _SIMULATION_CAN_RUN,
"qlbm_simulation_backend_note": _SIMULATION_BACKEND_NOTE,
"qlbm_simulation_backend_mode": _SIMULATION_MODE_LABEL,
# Workflow highlighting
"qlbm_workflow_step": 0,
"qlbm_overview_card_style": _WORKFLOW_BASE_STYLE,
"qlbm_distribution_card_style": _WORKFLOW_BASE_STYLE,
"qlbm_advect_card_style": _WORKFLOW_BASE_STYLE,
"qlbm_meshing_card_style": _WORKFLOW_BASE_STYLE,
"qlbm_backend_card_style": _WORKFLOW_BASE_STYLE,
# Pick point text
"qlbm_pick_text": "",
# Qiskit backend state
"qlbm_qiskit_mode": False, # True when using Qiskit backend (shows Plotly slider)
"qlbm_qiskit_backend_available": _QISKIT_BACKEND_AVAILABLE,
"qlbm_qiskit_fig": None, # Stores the Plotly figure for Qiskit results
# Job retrieval state (for loading previously saved QPU job results)
"qlbm_job_upload_error": "", # Error message for retrieval
"qlbm_job_upload_success": "", # Success message for retrieval
"qlbm_job_platform": "IonQ", # Platform: IonQ or IBM
"qlbm_job_id": "", # Job ID text field for direct entry
"qlbm_job_total_time": 3, # Total time T (generates T_list = [1..T])
"qlbm_job_output_resolution": 40, # Grid resolution for density estimation
"qlbm_job_is_processing": False, # True when processing job
"qlbm_job_flag_qubits": True, # Whether flag qubits were used
"qlbm_job_midcircuit_meas": False, # IonQ uses False, IBM uses True
})
_initialized = True
def log_to_console(message):
"""Log a message to the QLBM console."""
if _state is None:
return
timestamp = datetime.now().strftime("%H:%M:%S")
new_line = f"[{timestamp}] {message}\n"
_state.qlbm_console_output = (_state.qlbm_console_output or "") + new_line
def _set_pick_text(text):
"""Set the pick text for point picking."""
if _state is not None:
_state.qlbm_pick_text = text
def _create_plotter():
"""Create and return the PyVista plotter."""
global _plotter
if _plotter is None:
pv.OFF_SCREEN = True
_plotter = pv.Plotter()
return _plotter
def _ensure_point_picking(callback):
"""Enable point picking on the plotter."""
global _plotter
if _plotter is None:
return
try:
_plotter.enable_point_picking(
callback=callback,
show_message=False,
use_picker=True,
pickable_window=False,
show_point=True,
point_size=12,
color="red",
)
except Exception:
pass
# --- Workflow Highlighting ---
def _determine_workflow_step():
"""Determine current workflow step based on state."""
if _state is None:
return 0
if not _state.qlbm_problems_selection:
return 0
if not _state.qlbm_dist_type:
return 1
if not _state.qlbm_advecting_field:
return 2
if not _state.qlbm_backend_type:
return 4
return 5
def _apply_workflow_highlights(step):
"""Apply highlighting to the current workflow step card."""
if _state is None:
return
for i, key in enumerate(_WORKFLOW_CARD_KEYS):
attr = f"qlbm_{key}"
if hasattr(_state, attr):
setattr(_state, attr, _WORKFLOW_HIGHLIGHT_STYLE if i == step else _WORKFLOW_BASE_STYLE)
# --- Qubit Info ---
def update_qubit_3D_info(grid_size: int):
"""Generate qubit requirement plot and info strings."""
try:
num_reg_qubits = int(math.log2(grid_size)) if grid_size > 0 else 3
x = np.array([16, 32, 64, 128, 256])
y = np.log2(x).astype(int)
fig = go.Figure()
fig.add_trace(go.Scatter(x=x, y=y, mode='lines', name='Qubits/Direction', line=dict(color='#7A3DB5', width=3)))
fig.add_trace(go.Scatter(x=[grid_size], y=[num_reg_qubits], mode='markers',
marker=dict(size=12, color='red'), name='Current Selection'))
fig.update_layout(
xaxis_title="Grid Size (Points/Direction)",
yaxis_title="Qubits/Direction",
width=616,
height=320,
margin=dict(l=40, r=20, t=20, b=40)
)
grid_display = f"Grid Size: {grid_size} × {grid_size} × {grid_size}"
warning = ""
if grid_size > 64:
warning = "⚠️ Warning: Grid sizes > 64 may exceed simulator/memory limits!"
elif grid_size > 16 and _state and _state.qlbm_selected_qpu == "IBM QPU" and _state.qlbm_backend_type == "QPU":
warning = "⚠️ Warning: Grid size > 16 may exceed IBM QPU capacity!"
return fig, grid_display, warning
except Exception:
return go.Figure(), "Grid Size: N/A", ""
# --- Velocity Presets ---
def set_velocity_preset(preset_name):
"""Map velocity preset buttons to expression triplets."""
if _state is None:
return
mapping = {
"Uniform": ("0.6", "-0.5", "1"),
"Swirl": ("sin(-2*pi*z)", "1", "sin(2*pi*x)"),
"Shear": ("abs(z-0.5)*4-1", "0", "0"),
"TGV": ("0.5*cos(2*pi*x)*sin(2*pi*y)*sin(2*pi*z)", "-*sin(2*pi*x)*cos(2*pi*y)*sin(2*pi*z)", "0.5*sin(2*pi*x)*sin(2*pi*y)*cos(2*pi*z)"),
}
vx, vy, vz = mapping.get(preset_name, mapping["Uniform"])
_state.qlbm_advecting_field = preset_name
_state.qlbm_vx_expr = vx
_state.qlbm_vy_expr = vy
_state.qlbm_vz_expr = vz
def make_velocity_func(expr):
"""Convert a string expression into a function of (x, y, z)."""
def func(x, y, z):
context = {
"x": x, "y": y, "z": z,
"sin": np.sin, "cos": np.cos, "tan": np.tan,
"pi": np.pi, "abs": np.abs, "exp": np.exp, "sqrt": np.sqrt
}
try:
return eval(str(expr), {"__builtins__": {}}, context)
except Exception as e:
print(f"Error evaluating velocity expression '{expr}': {e}")
return np.zeros_like(x) if isinstance(x, np.ndarray) else 0.0
return func
def _safe_velocity_sample(func) -> float:
try:
val = func(0.5, 0.5, 0.5)
if isinstance(val, np.ndarray):
val = float(np.mean(val))
return float(val)
except Exception:
return 0.0
def build_ui():
"""Build the QLBM UI into the current Trame context."""
if _state is None:
raise RuntimeError("Server not set. Call set_server() first.")
init_state()
plotter = _create_plotter()
# Register state change handlers
_register_handlers()
# Apply initial CSS
html.Style("""
:root{ --v-theme-primary:95,37,159; }
.example-img{ max-width:100%; border-radius:4px; }
.warn-text{ color:#b71c1c; font-size:0.85rem; }
""")
# Build the UI
with vuetify3.VContainer(fluid=True, classes="pa-0 fill-height"):
with vuetify3.VRow(no_gutters=True, classes="fill-height"):
# Left Column: Controls
with vuetify3.VCol(cols=5, classes="pa-2 d-flex flex-column", style="overflow-y: auto; max-height: 200vh;"):
_build_control_panels(plotter)
# Right Column: Visualization
with vuetify3.VCol(cols=7, classes="pa-1 d-flex flex-column"):
_build_visualization_panel(plotter)
# Floating status window
_build_status_window()
# --- Distribution Figure Functions ---
def get_initial_distribution_figure(distribution_type, N, show_edges=False):
"""Generate a 3D Plotly figure for the initial distribution."""
if _state is None:
return go.Figure()
if distribution_type == "Sinusoidal":
kx = max(1.0, round(float(_state.qlbm_sine_k_x))) if hasattr(_state, "qlbm_sine_k_x") else 1.0
ky = max(1.0, round(float(_state.qlbm_sine_k_y))) if hasattr(_state, "qlbm_sine_k_y") else 1.0
kz = max(1.0, round(float(_state.qlbm_sine_k_z))) if hasattr(_state, "qlbm_sine_k_z") else 1.0
selected_func = lambda x, y, z: \
np.sin(x * 2 * np.pi * kx / N) * \
np.sin(y * 2 * np.pi * ky / N) * \
np.sin(z * 2 * np.pi * kz / N) + 1
title = f"Sinusoidal Distribution (N={N})"
elif distribution_type == "Gaussian":
cx = _state.qlbm_gauss_cx if hasattr(_state, "qlbm_gauss_cx") else N/2
cy = _state.qlbm_gauss_cy if hasattr(_state, "qlbm_gauss_cy") else N/2
cz = _state.qlbm_gauss_cz if hasattr(_state, "qlbm_gauss_cz") else N/2
sigma = _state.qlbm_gauss_sigma if hasattr(_state, "qlbm_gauss_sigma") and _state.qlbm_gauss_sigma > 0 else 0.1
selected_func = lambda x, y, z: \
np.exp(-((x - cx)**2 / (2 * sigma**2) +
(y - cy)**2 / (2 * sigma**2) +
(z - cz)**2 / (2 * sigma**2))) * 1.8 + 0.2
title = f"Gaussian Distribution (N={N})"
elif distribution_type == "Multi-Dirac-Delta":
# Get log2 frequency multipliers from state
kx_log2 = int(_state.qlbm_mdd_kx_log2) if hasattr(_state, "qlbm_mdd_kx_log2") else 1
ky_log2 = int(_state.qlbm_mdd_ky_log2) if hasattr(_state, "qlbm_mdd_ky_log2") else 1
kz_log2 = int(_state.qlbm_mdd_kz_log2) if hasattr(_state, "qlbm_mdd_kz_log2") else 1
# Number of peaks per axis
num_peaks_x = 2 ** kx_log2
num_peaks_y = 2 ** ky_log2
num_peaks_z = 2 ** kz_log2
# Create a function that produces peaks at regular intervals
# Peaks are located at positions: N/(2*num_peaks) + i*N/num_peaks for i in 0..num_peaks-1
def multi_dirac_func(x, y, z):
# Use narrow Gaussians to approximate delta functions
delta_width = max(0.5, N / (8 * max(num_peaks_x, num_peaks_y, num_peaks_z)))
result = np.zeros_like(x, dtype=float)
for ix in range(num_peaks_x):
peak_x = (0.5 + ix) * N / num_peaks_x
for iy in range(num_peaks_y):
peak_y = (0.5 + iy) * N / num_peaks_y
for iz in range(num_peaks_z):
peak_z = (0.5 + iz) * N / num_peaks_z
result += np.exp(-(
(x - peak_x)**2 + (y - peak_y)**2 + (z - peak_z)**2
) / (2 * delta_width**2))
# Normalize to range [0.2, 2.0] for visibility
if result.max() > 0:
result = result / result.max() * 1.8 + 0.2
return result
selected_func = multi_dirac_func
title = f"Multi-Dirac-Delta (kx={num_peaks_x}, ky={num_peaks_y}, kz={num_peaks_z})"
else:
return go.Figure()
# Create 3D grid
x_indices = np.linspace(0, 1, N)
y_indices = np.linspace(0, 1, N)
z_indices = np.linspace(0, 1, N)
X, Y, Z = np.meshgrid(x_indices, y_indices, z_indices, indexing='ij')
# Calculate distribution values
xi = np.arange(0, N)
yi = np.arange(0, N)
zi = np.arange(0, N)
Xi, Yi, Zi = np.meshgrid(xi, yi, zi, indexing='ij')
values = selected_func(Xi, Yi, Zi)
# Create Plotly visualization
isomin = np.min(values)
isomax = np.max(values)
surface_count = 5
if distribution_type == "Sinusoidal":
isomin = 0.1
isomax = 1.9
surface_count = 4
data = [go.Isosurface(
x=X.flatten(),
y=Y.flatten(),
z=Z.flatten(),
value=values.flatten(),
isomin=isomin,
isomax=isomax,
surface_count=surface_count,
colorscale=_QLBM_PREVIEW_COLORSCALE,
opacity=0.35,
caps=dict(x_show=False, y_show=False, z_show=False)
)]
# Add translucent peach unit cube to give spatial frame
cube_x = [0, 1, 1, 0, 0, 1, 1, 0]
cube_y = [0, 0, 1, 1, 0, 0, 1, 1]
cube_z = [0, 0, 0, 0, 1, 1, 1, 1]
cube_color = "rgba(255,218,185,0.25)" # Peach with transparency
data.append(go.Mesh3d(
x=cube_x,
y=cube_y,
z=cube_z,
i=[7, 0, 0, 0, 4, 4, 6, 6, 4, 0, 3, 2],
j=[3, 4, 1, 2, 5, 6, 5, 2, 0, 1, 6, 3],
k=[0, 7, 2, 3, 6, 7, 1, 1, 5, 5, 7, 6],
opacity=0.18,
color=cube_color,
flatshading=True,
showscale=False,
name="Unit Cube"
))
cube_edge_x = [
0, 1, 1, 0, 0, None,
0, 1, 1, 0, 0, None,
0, 0, None,
1, 1, None,
1, 1, None,
0, 0
]
cube_edge_y = [
0, 0, 1, 1, 0, None,
0, 0, 1, 1, 0, None,
0, 0, None,
0, 0, None,
1, 1, None,
1, 1
]
cube_edge_z = [
0, 0, 0, 0, 0, None,
1, 1, 1, 1, 1, None,
0, 1, None,
0, 1, None,
0, 1, None,
0, 1
]
data.append(go.Scatter3d(
x=cube_edge_x,
y=cube_edge_y,
z=cube_edge_z,
mode='lines',
line=dict(color='#E3A079', width=3),
opacity=0.9,
name='Unit Cube Frame'
))
if show_edges:
# Create grid lines
Y_yz, Z_yz = np.meshgrid(y_indices, z_indices, indexing='ij')
Y_flat, Z_flat = Y_yz.flatten(), Z_yz.flatten()
num_lines = len(Y_flat)
xe = np.full(num_lines * 3, np.nan)
xe[0::3], xe[1::3] = 0, 1
ye = np.full(num_lines * 3, np.nan)
ye[0::3] = ye[1::3] = Y_flat
ze = np.full(num_lines * 3, np.nan)
ze[0::3] = ze[1::3] = Z_flat
X_xz, Z_xz = np.meshgrid(x_indices, z_indices, indexing='ij')
X_flat, Z_flat = X_xz.flatten(), Z_xz.flatten()
num_lines = len(X_flat)
xe_y = np.full(num_lines * 3, np.nan)
xe_y[0::3] = xe_y[1::3] = X_flat
ye_y = np.full(num_lines * 3, np.nan)
ye_y[0::3], ye_y[1::3] = 0, 1
ze_y = np.full(num_lines * 3, np.nan)
ze_y[0::3] = ze_y[1::3] = Z_flat
X_xy, Y_xy = np.meshgrid(x_indices, y_indices, indexing='ij')
X_flat, Y_flat = X_xy.flatten(), Y_xy.flatten()
num_lines = len(X_flat)
xe_z = np.full(num_lines * 3, np.nan)
xe_z[0::3] = xe_z[1::3] = X_flat
ye_z = np.full(num_lines * 3, np.nan)
ye_z[0::3] = ye_z[1::3] = Y_flat
ze_z = np.full(num_lines * 3, np.nan)
ze_z[0::3], ze_z[1::3] = 0, 1
x_all = np.concatenate([xe, xe_y, xe_z])
y_all = np.concatenate([ye, ye_y, ye_z])
z_all = np.concatenate([ze, ze_y, ze_z])
data.append(go.Scatter3d(
x=x_all, y=y_all, z=z_all,
mode='lines',
line=dict(color='black', width=1),
opacity=0.22,
name='Grid Edges'
))
fig = go.Figure(data=data)
fig.update_layout(
title=title,
scene=dict(
xaxis=dict(backgroundcolor="white", showbackground=True, gridcolor="lightgrey", zerolinecolor="lightgrey", title='X'),
yaxis=dict(backgroundcolor="white", showbackground=True, gridcolor="lightgrey", zerolinecolor="lightgrey", title='Y'),
zaxis=dict(backgroundcolor="white", showbackground=True, gridcolor="lightgrey", zerolinecolor="lightgrey", title='Z'),
),
margin=dict(l=0, r=0, b=0, t=40),
width=800,
height=700
)
return fig
def update_view():
"""Update the preview visualization."""
global current_grid_object
if _state is None:
return
# If simulation has run, don't update the preview
if _state.qlbm_simulation_has_run:
return
try:
N = int(_state.qlbm_nx)
distribution_type = _state.qlbm_dist_type
show_edges = _state.qlbm_show_edges
fig = get_initial_distribution_figure(distribution_type, N, show_edges)
if hasattr(_ctrl, "qlbm_preview_update"):
_ctrl.qlbm_preview_update(fig)
except Exception as e:
print(f"Error updating view: {e}")
def on_pick_point(point, *_) -> None:
"""Handle point picking on the 3D visualization."""
global current_grid_object
if point is None or current_grid_object is None:
return
closest_id = current_grid_object.find_closest_point(point)
if closest_id == -1:
return
values = current_grid_object.point_data.get('scalars')
if values is None:
return
coords = current_grid_object.points[closest_id]
val = float(values[closest_id])
x, y, z = coords
_set_pick_text(f"Position: ({x:.3f}, {y:.3f}, {z:.3f})\nValue: {val:.4g}")
if hasattr(_ctrl, "qlbm_view_update"):
_ctrl.qlbm_view_update()
# --- Geometry Figure ---
def get_geometry_figure():
"""Generates a 3D Plotly figure for the selected geometry."""
if _state is None:
return go.Figure()
geom = _state.qlbm_geometry_selection
if geom == "Cube":
fig = _create_box_figure(1, 1, 1, "Cube")
elif geom == "Rectangular domain with a heated box (3D)":
try:
L = float(_state.qlbm_domain_L)
W = float(_state.qlbm_domain_W)
H = float(_state.qlbm_domain_H)
except:
L, W, H = 1.0, 1.0, 1.0
max_dim = max(L, W, H)
if max_dim > 0:
L /= max_dim
W /= max_dim
H /= max_dim
fig = _create_box_figure(L, W, H, "Rectangular Domain")
else:
fig = go.Figure()
fig.update_layout(
scene=dict(xaxis=dict(visible=False), yaxis=dict(visible=False), zaxis=dict(visible=False)),
margin=dict(l=0, r=0, b=0, t=0),
)
return fig
fig.update_layout(
scene=dict(
xaxis=dict(visible=False),
yaxis=dict(visible=False),
zaxis=dict(visible=False),
aspectmode='data'
),
margin=dict(l=0, r=0, b=0, t=30),
)
return fig
def _create_box_figure(lx, ly, lz, title):
"""Create a 3D box figure."""
x = [0, lx, lx, 0, 0, lx, lx, 0]
y = [0, 0, ly, ly, 0, 0, ly, ly]
z = [0, 0, 0, 0, lz, lz, lz, lz]
fig = go.Figure()
fig.add_trace(go.Mesh3d(
x=x, y=y, z=z,
i=[7, 0, 0, 0, 4, 4, 6, 6, 4, 0, 3, 2],
j=[3, 4, 1, 2, 5, 6, 5, 2, 0, 1, 6, 3],
k=[0, 7, 2, 3, 6, 7, 1, 1, 5, 5, 7, 6],
opacity=0.2,
intensity=list(range(len(x))),
colorscale=_QLBM_PREVIEW_COLORSCALE,
flatshading=True,
name=title,
showscale=False
))
xe = [0, lx, lx, 0, 0, None, 0, lx, lx, 0, 0, None, 0, 0, None, lx, lx, None, lx, lx, None, 0, 0]
ye = [0, 0, ly, ly, 0, None, 0, 0, ly, ly, 0, None, 0, 0, None, 0, 0, None, ly, ly, None, ly, ly]
ze = [0, 0, 0, 0, 0, None, lz, lz, lz, lz, lz, None, 0, lz, None, 0, lz, None, 0, lz, None, 0, lz]
fig.add_trace(go.Scatter3d(
x=xe, y=ye, z=ze,
mode='lines',
line=dict(color='black', width=3),
showlegend=False
))
fig.update_layout(title=title)
return fig
def update_geometry_view():
"""Update the geometry visualization."""
try:
fig = get_geometry_figure()
if hasattr(_ctrl, "qlbm_geometry_plot_update"):
_ctrl.qlbm_geometry_plot_update(fig)
except Exception as e:
print(f"Error updating geometry view: {e}")
# --- CPU Demo Simulation ---
def _cpu_distribution_field(distribution_type: str, Xi, Yi, Zi, grid_size: int, drift, phase_fraction: float):
"""Generate the distribution field for CPU demo simulation."""
if _state is None:
return np.ones_like(Xi)
if distribution_type == "Sinusoidal":
kx = max(1.0, round(float(_state.qlbm_sine_k_x))) if hasattr(_state, "qlbm_sine_k_x") else 1.0
ky = max(1.0, round(float(_state.qlbm_sine_k_y))) if hasattr(_state, "qlbm_sine_k_y") else 1.0
kz = max(1.0, round(float(_state.qlbm_sine_k_z))) if hasattr(_state, "qlbm_sine_k_z") else 1.0
x_term = np.sin((np.mod(Xi + drift[0], grid_size)) * 2 * np.pi * kx / grid_size)
y_term = np.sin((np.mod(Yi + drift[1], grid_size)) * 2 * np.pi * ky / grid_size)
z_term = np.sin((np.mod(Zi + drift[2], grid_size)) * 2 * np.pi * kz / grid_size)
field = x_term * y_term * z_term + 1.0
else:
# Gaussian
nx_val = max(1.0, float(_state.qlbm_nx)) if hasattr(_state, "qlbm_nx") else float(grid_size)
cx = float(_state.qlbm_gauss_cx) if hasattr(_state, "qlbm_gauss_cx") else nx_val / 2
cy = float(_state.qlbm_gauss_cy) if hasattr(_state, "qlbm_gauss_cy") else nx_val / 2
cz = float(_state.qlbm_gauss_cz) if hasattr(_state, "qlbm_gauss_cz") else nx_val / 2
sigma = float(_state.qlbm_gauss_sigma) if hasattr(_state, "qlbm_gauss_sigma") else nx_val / 6
scale = (grid_size - 1) / nx_val if nx_val else 1.0
cx = cx * scale + drift[0]
cy = cy * scale + drift[1]
cz = cz * scale + drift[2]
sigma = max(1.0, sigma * scale)
field = np.exp(-(((Xi - cx) ** 2 + (Yi - cy) ** 2 + (Zi - cz) ** 2) / (2 * sigma ** 2))) * 1.8 + 0.2
modulation = 0.15 * np.sin(2 * np.pi * phase_fraction + (Xi + Yi + Zi) * np.pi / max(1, grid_size))
return field + modulation
def _run_cpu_demo_simulation(grid_size: int, T: int, distribution_type: str, vx_func, vy_func, vz_func, progress_callback=None):
"""Run CPU demo simulation."""
grid_size = int(max(8, min(grid_size, _CPU_DEMO_MAX_GRID)))
idx_coords = np.linspace(0, grid_size - 1, grid_size, dtype=np.float32)
Xi, Yi, Zi = np.meshgrid(idx_coords, idx_coords, idx_coords, indexing='ij')
geom_coords = np.linspace(0, 1, grid_size, dtype=np.float32)
Xg, Yg, Zg = np.meshgrid(geom_coords, geom_coords, geom_coords, indexing='ij')
if T <= 0:
target = 1.0
else:
target = float(T)
num_frames = min(30, max(2, int(min(target, 20)) + 1))
timeline = list(np.linspace(0.0, target, num_frames))
if len(timeline) < 2:
timeline.append(target)
vx = _safe_velocity_sample(vx_func)
vy = _safe_velocity_sample(vy_func)
vz = _safe_velocity_sample(vz_func)
drift_scale = 0.25 * grid_size
frames = []
for idx, t_val in enumerate(timeline):
phase_fraction = idx / (len(timeline) - 1) if len(timeline) > 1 else 0.0
drift = (
vx * phase_fraction * drift_scale,
vy * phase_fraction * drift_scale,
vz * phase_fraction * drift_scale,
)
field = _cpu_distribution_field(distribution_type, Xi, Yi, Zi, grid_size, drift, phase_fraction)
frames.append(field.astype(np.float32))
if progress_callback:
percent = int(((idx + 1) / len(timeline)) * 100)
progress_callback(percent)
grid = pv.StructuredGrid()
grid.points = np.column_stack((Xg.ravel(), Yg.ravel(), Zg.ravel()))
grid.dimensions = [grid_size, grid_size, grid_size]
grid["scalars"] = frames[0].ravel()
times = [float(t) for t in timeline]
return frames, times, grid
# --- Export Functions ---
def export_simulation_vtk():
"""Download the current simulation volume as a VTK file."""
global current_grid_object
if not _state.qlbm_simulation_has_run or current_grid_object is None:
log_to_console("VTK export unavailable: run a simulation first.")
return
temp_path = None
try:
suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
grid_size = int(_state.qlbm_grid_size or 0)
filename = f"qlbm_volume_n{grid_size}_{suffix}.vts"
tmp = tempfile.NamedTemporaryFile(suffix=".vts", delete=False)
tmp.close()
temp_path = Path(tmp.name)
current_grid_object.save(str(temp_path))
_server.controller.download_file(temp_path.read_bytes(), filename)
log_to_console(f"Exported VTK to {filename}")
except Exception as exc:
log_to_console(f"VTK export failed: {exc}")
finally:
if temp_path and temp_path.exists():
try:
temp_path.unlink()
except Exception:
pass
def export_simulation_mp4():
"""Render the simulation frames to an MP4 animation for download."""
global simulation_data_frames, current_grid_object
if not _state.qlbm_simulation_has_run or not simulation_data_frames:
log_to_console("MP4 export unavailable: run a simulation first.")
return
if current_grid_object is None:
log_to_console("MP4 export failed: missing grid data.")
return
temp_path = None
movie_plotter = None
try:
suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
grid_size = int(_state.qlbm_grid_size or 0)
filename = f"qlbm_animation_n{grid_size}_{suffix}.mp4"
tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
tmp.close()
temp_path = Path(tmp.name)
movie_plotter = pv.Plotter(off_screen=True, window_size=(1280, 720))
try:
camera_position = _plotter.camera_position if _plotter and _plotter.camera_position else None
except Exception:
camera_position = None
base_grid = current_grid_object.copy()
movie_plotter.open_movie(str(temp_path), framerate=15)
for frame_data in simulation_data_frames:
base_grid["scalars"] = np.asarray(frame_data).ravel()
iso_mesh = base_grid.contour(isosurfaces=7, scalars="scalars")
movie_plotter.clear()
movie_plotter.add_mesh(
iso_mesh,
cmap="Blues",
opacity=0.35,
show_scalar_bar=False,
)
movie_plotter.add_axes()
if camera_position:
try:
movie_plotter.camera_position = camera_position
except Exception:
pass
else:
movie_plotter.view_isometric()
movie_plotter.render()
movie_plotter.write_frame()
movie_plotter.close()
movie_plotter = None
_server.controller.download_file(temp_path.read_bytes(), filename)
log_to_console(f"Exported MP4 to {filename}")
except Exception as exc:
log_to_console(f"MP4 export failed: {exc}")
finally:
if movie_plotter is not None:
try:
movie_plotter.close()
except Exception:
pass
if temp_path and temp_path.exists():
try:
temp_path.unlink()
except Exception:
pass
# --- Qiskit Simulation Functions ---
def _map_state_to_qiskit_params():
"""
Map qlbm_embedded state variables to qlbm_sample_app parameters.
Returns
-------
dict or None
Dictionary of parameters for run_sampling_sim, or None if state is unavailable
"""
if _state is None:
return None
# Map distribution type
dist_type = _state.qlbm_dist_type
if dist_type == "Sinusoidal":
init_state_name = "sin"
elif dist_type == "Gaussian":
init_state_name = "gaussian"
elif dist_type == "Multi-Dirac-Delta":
init_state_name = "multi_dirac_delta"
else:
init_state_name = "sin" # Default
# Calculate n from grid_size (grid_size = 2^n)
grid_size = int(_state.qlbm_grid_size)
n = int(math.log2(grid_size)) if grid_size > 0 else 3
# Map Gaussian parameters from grid units to normalized [0,1]
# In the UI, gauss_cx/cy/cz are in grid units (0 to nx)
# qlbm_sample_app expects normalized [0,1]
nx = float(_state.qlbm_nx) if _state.qlbm_nx else float(grid_size)
gauss_cx = float(_state.qlbm_gauss_cx) / nx if nx > 0 else 0.5
gauss_cy = float(_state.qlbm_gauss_cy) / nx if nx > 0 else 0.5
gauss_cz = float(_state.qlbm_gauss_cz) / nx if nx > 0 else 0.5
gauss_sigma = float(_state.qlbm_gauss_sigma) / nx if nx > 0 else 0.2
# Create T_list from time_steps: [1, 2, 3, ..., T]
time_steps = int(_state.qlbm_time_steps)
if time_steps <= 0:
T_list = [1]
else:
T_list = list(range(1, time_steps + 1))
return {
"n": n,
"init_state_name": init_state_name,
"sine_k_x": float(_state.qlbm_sine_k_x),
"sine_k_y": float(_state.qlbm_sine_k_y),
"sine_k_z": float(_state.qlbm_sine_k_z),
"gauss_cx": gauss_cx,
"gauss_cy": gauss_cy,
"gauss_cz": gauss_cz,
"gauss_sigma": gauss_sigma,
"mdd_kx_log2": int(_state.qlbm_mdd_kx_log2) if hasattr(_state, "qlbm_mdd_kx_log2") else 1,
"mdd_ky_log2": int(_state.qlbm_mdd_ky_log2) if hasattr(_state, "qlbm_mdd_ky_log2") else 1,
"mdd_kz_log2": int(_state.qlbm_mdd_kz_log2) if hasattr(_state, "qlbm_mdd_kz_log2") else 1,
"vx_expr": str(_state.qlbm_vx_expr),
"vy_expr": str(_state.qlbm_vy_expr),
"vz_expr": str(_state.qlbm_vz_expr),
"T_list": T_list,
"grid_size": grid_size,
}
def _run_qiskit_simulation(progress_callback=None):
"""
Run QLBM simulation using Qiskit Aer statevector simulator.
Parameters
----------
progress_callback : callable, optional
Function to report progress (0-100)
Returns
-------
output : list[ndarray]
List of 3D density arrays, one per timestep
fig : go.Figure
Plotly figure with slider animation
T_list : list[int]
List of timesteps
"""
if not _QISKIT_BACKEND_AVAILABLE:
raise RuntimeError(f"Qiskit backend not available: {_QISKIT_IMPORT_ERROR}")
params = _map_state_to_qiskit_params()
if params is None:
raise RuntimeError("Failed to map state parameters")
log_to_console(f"Qiskit Simulation Parameters:")
log_to_console(f" n={params['n']} (grid {params['grid_size']}³)")
log_to_console(f" T_list={params['T_list']}")
log_to_console(f" Distribution: {params['init_state_name']}")
log_to_console(f" Velocity: vx={params['vx_expr']}, vy={params['vy_expr']}, vz={params['vz_expr']}")
if progress_callback:
progress_callback(5)
# Create initial state circuit using qlbm_sample_app function
log_to_console("Creating initial state circuit...")
init_state_prep_circ = get_named_init_state_circuit(
n=params["n"],
init_state_name=params["init_state_name"],
sine_k_x=params["sine_k_x"],
sine_k_y=params["sine_k_y"],
sine_k_z=params["sine_k_z"],
gauss_cx=params["gauss_cx"],
gauss_cy=params["gauss_cy"],
gauss_cz=params["gauss_cz"],
gauss_sigma=params["gauss_sigma"],
mdd_kx_log2=params["mdd_kx_log2"],
mdd_ky_log2=params["mdd_ky_log2"],
mdd_kz_log2=params["mdd_kz_log2"],
)
if progress_callback:
progress_callback(15)
log_to_console("Running Qiskit Aer statevector simulation...")
log_to_console(f" Processing {len(params['T_list'])} timestep(s)...")
# Determine velocity resolution (cap for performance)
vel_resolution = min(params['grid_size'], 32)
# Define a progress wrapper to map 0-100% simulation progress to 15-95% overall progress
def sim_progress_wrapper(p):
if progress_callback:
# Map 0-100 -> 15-95
weighted_p = 15 + (p / 100.0) * (95 - 15)
progress_callback(weighted_p)
# Run simulation using qlbm_sample_app function
output, fig = run_sampling_sim(
n=params["n"],
ux=params["vx_expr"],
uy=params["vy_expr"],
uz=params["vz_expr"],
init_state_prep_circ=init_state_prep_circ,
T_list=params["T_list"],
vel_resolution=vel_resolution,
progress_callback=sim_progress_wrapper,
)
if progress_callback:
progress_callback(95)
log_to_console(f"Simulation complete: {len(output)} frame(s) generated")
return output, fig, params["T_list"]
# --- Job Result Upload Processing ---
def process_uploaded_job_result():
"""
Process an IBM or IonQ job by retrieving it directly using the Job ID.
This function:
1. Takes the Job ID from user input (or extracts from uploaded filename)
2. Connects to IBM/IonQ based on platform selection and retrieves the job
3. Processes the job results (IBM: job.result(), IonQ: job.get_counts(i))
4. Calls load_samples/estimate_density for each timestep
5. Generates the slider figure using plot_density_isosurface_slider
"""
global simulation_data_frames, simulation_times, current_grid_object
if _state is None:
return
# Validate required imports
if not _VISUALIZE_COUNTS_AVAILABLE:
_state.qlbm_job_upload_error = "visualize_counts module not available. Cannot process job results."
log_to_console("Error: visualize_counts module not available")
return
# Get job ID from text field
job_id = None
if _state.qlbm_job_id and str(_state.qlbm_job_id).strip():
job_id = str(_state.qlbm_job_id).strip()
# Remove .json extension if present
if job_id.endswith(".json"):
job_id = job_id[:-5]
log_to_console(f"Using Job ID from text field: {job_id}")
if not job_id:
_state.qlbm_job_upload_error = "No Job ID provided. Please enter a Job ID."
return
# Get platform selection
platform = _state.qlbm_job_platform or "IonQ"
# Reset messages
_state.qlbm_job_upload_error = ""
_state.qlbm_job_upload_success = ""
_state.qlbm_job_is_processing = True
log_to_console(f"Processing {platform} Job ID: {job_id}")
try:
# Parse timesteps from user input
try:
total_time = int(_state.qlbm_job_total_time or 3)
if total_time < 1:
total_time = 1
T_list = list(range(1, total_time + 1))
except ValueError:
_state.qlbm_job_upload_error = "Invalid Total Time. Please enter a positive integer."
_state.qlbm_job_is_processing = False
return
log_to_console(f"Timesteps to process: {T_list}")
# Get processing parameters
output_resolution = int(_state.qlbm_job_output_resolution or 40)
# Platform-specific parameters
if platform == "IBM":
flag_qubits = True
midcircuit_meas = True # IBM uses midcircuit_meas=True
else: # IonQ
flag_qubits = True
midcircuit_meas = False # IonQ uses midcircuit_meas=False
log_to_console(f"Platform: {platform}, Resolution: {output_resolution}, Flag qubits: {flag_qubits}, Midcircuit meas: {midcircuit_meas}")
output = []
if platform == "IBM":
# === IBM Job Retrieval ===
log_to_console("Connecting to IBM Quantum...")
try:
from qiskit_ibm_runtime import QiskitRuntimeService
except ImportError:
_state.qlbm_job_upload_error = "qiskit_ibm_runtime package not available. Please install it."
_state.qlbm_job_is_processing = False
log_to_console("Error: qiskit_ibm_runtime not installed")
return
# Get API token from environment
ibm_token = os.environ.get("API_KEY_IBM_QLBM")
if not ibm_token:
_state.qlbm_job_upload_error = "IBM API token not found. Set API_KEY_IBM_QLBM environment variable."
_state.qlbm_job_is_processing = False
log_to_console("Error: IBM API token not found in environment")
return
# Set up IBM service (same as run_sampling_hw_ibm)
try:
service = QiskitRuntimeService(
channel="ibm_cloud",
token=ibm_token,
instance="crn:v1:bluemix:public:quantum-computing:us-east:a/15157e4350c04a9dab51b8b8a4a93c86:e29afd91-64bf-4a82-8dbf-731e6c213595::",
)
log_to_console("Connected to IBM Quantum service")
except Exception as e:
_state.qlbm_job_upload_error = f"Failed to connect to IBM Quantum: {e}"
_state.qlbm_job_is_processing = False
log_to_console(f"Error connecting to IBM: {e}")
return
# Retrieve the job
log_to_console(f"Retrieving IBM job: {job_id}")
try:
job = service.job(job_id)
except Exception as e:
_state.qlbm_job_upload_error = f"Failed to retrieve IBM job: {e}"
_state.qlbm_job_is_processing = False
log_to_console(f"Error retrieving job: {e}")
return
# Check job status
try:
status = job.status()
status_name = status.name if hasattr(status, 'name') else str(status)
log_to_console(f"Job status: {status_name}")
if status_name not in ('DONE', 'COMPLETED'):
_state.qlbm_job_upload_error = f"Job is not complete. Current status: {status_name}"
_state.qlbm_job_is_processing = False
return
except Exception as e:
log_to_console(f"Warning: Could not check job status: {e}")
# Get results (same as run_sampling_hw_ibm)
log_to_console("Retrieving IBM job results...")
try:
result = job.result()
log_to_console("Results retrieved successfully")
except Exception as e:
_state.qlbm_job_upload_error = f"Failed to get job results: {e}"
_state.qlbm_job_is_processing = False
log_to_console(f"Error getting results: {e}")
return
# Process results (same pattern as run_sampling_hw_ibm)
log_to_console("Processing IBM job results...")
for idx, (T_total, pub) in enumerate(zip(T_list, result)):
try:
log_to_console(f"Processing timestep T={T_total} (circuit {idx})...")
# Get counts (same as run_sampling_hw_ibm)
try:
joined = pub.join_data()
counts = joined.get_counts()
except Exception as e:
log_to_console(f"Error retrieving counts for T={T_total}: {e}")
continue
log_to_console(f" Retrieved {len(counts)} unique bitstrings")
# Debug: show a few sample bitstrings
sample_count = 0
for bs, cnt in counts.items():
if sample_count < 3:
log_to_console(f" Sample: {bs} (count={cnt})")
sample_count += 1
# Process samples (same as run_sampling_hw_ibm)
pts, processed_counts = load_samples(
counts, T_total,
logger=log_to_console,
flag_qubits=flag_qubits,
midcircuit_meas=midcircuit_meas
)
log_to_console(f" load_samples returned {len(pts)} valid sample points")
# Estimate density
density = estimate_density(pts, processed_counts, bandwidth=0.05, grid_size=output_resolution)
output.append(density)
except Exception as e:
log_to_console(f"Error processing timestep {idx}: {e}")
import traceback
log_to_console(traceback.format_exc())
else:
# === IonQ Job Retrieval ===
log_to_console("Connecting to IonQ...")
try:
from qiskit_ionq import IonQProvider
except ImportError:
_state.qlbm_job_upload_error = "qiskit_ionq package not available. Please install it."
_state.qlbm_job_is_processing = False
log_to_console("Error: qiskit_ionq not installed")
return
# Get API token from environment (same pattern as run_sampling_hw_ionq)
ionq_token = os.environ.get("API_KEY_IONQ_QLBM") or os.environ.get("IONQ_API_TOKEN")
if not ionq_token:
_state.qlbm_job_upload_error = "IonQ API token not found. Set API_KEY_IONQ_QLBM environment variable."
_state.qlbm_job_is_processing = False
log_to_console("Error: IonQ API token not found in environment")
return
# Set the IONQ_API_TOKEN env var so IonQProvider() can find it (same as run_sampling_hw_ionq)
os.environ.setdefault("IONQ_API_TOKEN", ionq_token)
# Set up the IonQ provider and backend (IonQProvider reads from IONQ_API_TOKEN env var)
provider = IonQProvider()
backend = provider.get_backend("qpu.forte-enterprise-1")
backend_name = backend.name if isinstance(backend.name, str) else backend.name()
log_to_console(f"Connected to IonQ backend: {backend_name}")
# Retrieve the job
log_to_console(f"Retrieving IonQ job: {job_id}")
try:
job = backend.retrieve_job(job_id)
except Exception as e:
_state.qlbm_job_upload_error = f"Failed to retrieve IonQ job: {e}"
_state.qlbm_job_is_processing = False
log_to_console(f"Error retrieving job: {e}")
return
# Check job status
try:
status = job.status()
status_name = status.name if hasattr(status, 'name') else str(status)
log_to_console(f"Job status: {status_name}")
if status_name not in ('DONE', 'COMPLETED'):
_state.qlbm_job_upload_error = f"Job is not complete. Current status: {status_name}"
_state.qlbm_job_is_processing = False
return
except Exception as e:
log_to_console(f"Warning: Could not check job status: {e}")
# Process results (same as run_sampling_hw_ionq)
log_to_console("Processing IonQ job results...")
for i, T_total in enumerate(T_list):
try:
log_to_console(f"Processing timestep T={T_total} (circuit {i})...")
# Get counts directly from job (same as run_sampling_hw_ionq)
counts = job.get_counts(i)
log_to_console(f" Retrieved {len(counts)} unique bitstrings")
# Debug: show a few sample bitstrings
sample_count = 0
for bs, cnt in counts.items():
if sample_count < 3:
log_to_console(f" Sample: {bs} (count={cnt})")
sample_count += 1
# Process samples (same as run_sampling_hw_ionq)
pts, processed_counts = load_samples(
counts, T_total,
logger=log_to_console,
flag_qubits=flag_qubits,
midcircuit_meas=midcircuit_meas
)
log_to_console(f" load_samples returned {len(pts)} valid sample points")
# Estimate density
density = estimate_density(pts, processed_counts, bandwidth=0.05, grid_size=output_resolution)
output.append(density)
except IndexError:
log_to_console(f"Warning: No data found for timestep T={T_total} (circuit {i})")
break
except Exception as e:
log_to_console(f"Error processing timestep {i}: {e}")
import traceback
log_to_console(traceback.format_exc())
if not output:
_state.qlbm_job_upload_error = "No valid data extracted from job. Check timesteps parameter."
_state.qlbm_job_is_processing = False
return
log_to_console(f"Processed {len(output)} timestep(s) successfully")
# Generate the Plotly figure
fig = plot_density_isosurface_slider(output, T_list[:len(output)])
# Update state to show results
_state.qlbm_qiskit_mode = True
_state.qlbm_qiskit_fig = fig
_state.qlbm_simulation_has_run = True
_state.qlbm_job_upload_success = f"✓ Successfully processed {len(output)} timestep(s) from {platform} job {job_id}"
# Update the Plotly figure widget
if hasattr(_ctrl, "qlbm_qiskit_result_update"):
_ctrl.qlbm_qiskit_result_update(fig)
log_to_console(f"Results ready! {len(output)} frames generated.")
except Exception as e:
_state.qlbm_job_upload_error = f"Error processing job: {e}"
log_to_console(f"Processing error: {e}")
import traceback
log_to_console(traceback.format_exc())
finally:
_state.qlbm_job_is_processing = False
# --- Main Simulation ---
def run_simulation():
"""
Entry point for simulation - launches the async worker.
This is called by the UI button click and schedules the async task.
"""
if _server is None:
log_to_console("Error: Server not available")
return
# Schedule the async simulation
asyncio.ensure_future(_run_simulation_async())
async def _run_simulation_async():
"""
Async simulation runner that uses thread pool for blocking work.
This allows the UI to update in real-time during simulation.
"""
global simulation_data_frames, simulation_times, current_grid_object, _plotter, _qlbm_main_loop
from concurrent.futures import ThreadPoolExecutor
# Capture the main event loop for thread-safe callbacks
_qlbm_main_loop = asyncio.get_event_loop()
# Create executor for blocking operations
executor = ThreadPoolExecutor(max_workers=1)
loop = _qlbm_main_loop
if not _SIMULATION_CAN_RUN:
msg = _SIMULATION_DISABLED_REASON or "Simulation backend is not available on this platform."
_state.qlbm_run_error = msg
log_to_console(f"Error: {msg}")
_state.qlbm_status_message = "Error: Backend unavailable"
_state.qlbm_status_type = "error"
await _qlbm_flush_async()
executor.shutdown(wait=False)
return
_state.qlbm_is_running = True
_state.qlbm_run_error = ""
_state.qlbm_simulation_has_run = False
_state.qlbm_qiskit_mode = False # Reset Qiskit mode
_state.qlbm_show_progress = True
_state.qlbm_simulation_progress = 0
_state.qlbm_status_message = "Initializing simulation..."
_state.qlbm_status_type = "info"
await _qlbm_flush_async()
# Start heartbeat for continuous progress updates
_qlbm_start_progress_heartbeat()
# Determine if using Qiskit backend
use_qiskit = (
_state.qlbm_backend_type == "Simulator" and
_state.qlbm_selected_simulator == "IBM Qiskit simulator" and
_QISKIT_BACKEND_AVAILABLE
)
use_ibm_qpu = (
_state.qlbm_backend_type == "QPU" and
_state.qlbm_selected_qpu == "IBM QPU" and
_QISKIT_BACKEND_AVAILABLE
)
use_ionq_qpu = (
_state.qlbm_backend_type == "QPU" and
_state.qlbm_selected_qpu == "IonQ QPU" and
_QISKIT_BACKEND_AVAILABLE
)
# Log initial configuration
backend_info = f"{_state.qlbm_backend_type}"
if _state.qlbm_backend_type == "Simulator":
backend_info += f" - {_state.qlbm_selected_simulator}"
elif _state.qlbm_backend_type == "QPU":
backend_info += f" - {_state.qlbm_selected_qpu}"
log_to_console("Job Initiated")
log_to_console(f"Grid Size: {_state.qlbm_grid_size}×{_state.qlbm_grid_size}×{_state.qlbm_grid_size}, Time Steps: {_state.qlbm_time_steps}, Distribution: {_state.qlbm_dist_type}, Boundary: {_state.qlbm_boundary_condition}, Backend: {backend_info}, Velocity: vx={_state.qlbm_vx_expr}, vy={_state.qlbm_vy_expr}, vz={_state.qlbm_vz_expr}")
# Progress callback that uses thread-safe flush for real-time updates
last_logged_percent = [0] # Use list for nonlocal in nested function
def _progress_callback(percent):
_state.qlbm_simulation_progress = percent
if percent - last_logged_percent[0] >= 10:
log_to_console(f"Simulation progress: {int(percent)}%")
last_logged_percent[0] = percent
_qlbm_flush_state_threadsafe() # Thread-safe flush!
# QPU progress callback with status message support
def _qpu_progress_callback(percent, message=None):
_state.qlbm_simulation_progress = percent
if message:
_state.qlbm_status_message = message
_qlbm_flush_state_threadsafe()
try:
# === Qiskit Backend (IBM Qiskit Simulator) ===
if use_qiskit:
log_to_console("Using IBM Qiskit Simulator backend...")
_state.qlbm_status_message = "Running Qiskit Aer simulation..."
await _qlbm_flush_async()
# Run Qiskit simulation in executor to keep UI responsive
def _run_qiskit_blocking():
return _run_qiskit_simulation(progress_callback=_progress_callback)
output, plotly_fig, T_list = await loop.run_in_executor(executor, _run_qiskit_blocking)
# Store results
simulation_data_frames = output
simulation_times = [float(t) for t in T_list]
# Update the Plotly figure widget for Qiskit results
if hasattr(_ctrl, "qlbm_qiskit_result_update"):
_ctrl.qlbm_qiskit_result_update(plotly_fig)
_state.qlbm_max_time_step = len(output) - 1
_state.qlbm_time_val = 0
_state.qlbm_time_slider_labels = [f"T={t}" for t in T_list]
_state.qlbm_simulation_has_run = True
_state.qlbm_qiskit_mode = True # Use Plotly display instead of PyVista
_state.qlbm_simulation_progress = 100
log_to_console("Qiskit simulation completed successfully.")
_state.qlbm_status_message = "Simulation completed successfully."
_state.qlbm_status_type = "success"
_state.qlbm_show_progress = False
_qlbm_auto_hide_status_window(3.0) # Auto-hide after 3 seconds
await _qlbm_flush_async()
# === IBM QPU Backend ===
elif use_ibm_qpu:
log_to_console("Using IBM QPU backend...")
_state.qlbm_status_message = "Step 1: Preparing IBM QPU job..."
_state.qlbm_simulation_progress = 0
await _qlbm_flush_async()
params = _map_state_to_qiskit_params()
if params is None:
raise RuntimeError("Failed to map state parameters")
# Create initial state circuit (part of Step 1)
log_to_console("Creating initial state circuit...")
_state.qlbm_simulation_progress = 2
await _qlbm_flush_async()
init_state_prep_circ = get_named_init_state_circuit(
n=params["n"],
init_state_name=params["init_state_name"],
sine_k_x=params["sine_k_x"],
sine_k_y=params["sine_k_y"],
sine_k_z=params["sine_k_z"],
gauss_cx=params["gauss_cx"],
gauss_cy=params["gauss_cy"],
gauss_cz=params["gauss_cz"],
gauss_sigma=params["gauss_sigma"],
mdd_kx_log2=params["mdd_kx_log2"],
mdd_ky_log2=params["mdd_ky_log2"],
mdd_kz_log2=params["mdd_kz_log2"],
)
_state.qlbm_simulation_progress = 5
_state.qlbm_status_message = "Step 1: Circuit generation..."
await _qlbm_flush_async()
# Run HW simulation in executor with progress callback
def _run_ibm_qpu_blocking():
job, get_result = run_sampling_hw_ibm(
n=params["n"],
ux=params["vx_expr"],
uy=params["vy_expr"],
uz=params["vz_expr"],
init_state_prep_circ=init_state_prep_circ,
T_list=params["T_list"],
shots=2**14,
vel_resolution=min(params['grid_size'], 32),
output_resolution=min(2*params['grid_size'], 40),
logger=log_to_console,
progress_callback=_qpu_progress_callback,
)
# get_result already handles progress updates internally
output, plotly_fig = get_result(job)
return output, plotly_fig, init_state_prep_circ
output, plotly_fig, init_state_prep_circ = await loop.run_in_executor(executor, _run_ibm_qpu_blocking)
# Step 3: Finalizing results (T=0 snapshot removed - only show T=1 onwards)
_state.qlbm_simulation_progress = 92
_state.qlbm_status_message = "Step 3: Finalizing results..."
await _qlbm_flush_async()
# Use T_list directly (no T=0 prepend)
result_T_list = list(params["T_list"])
log_to_console(f"Results available for T={result_T_list}")
# Store results
simulation_data_frames = output
simulation_times = [float(t) for t in result_T_list]
# Update UI
if hasattr(_ctrl, "qlbm_qiskit_result_update"):
_ctrl.qlbm_qiskit_result_update(plotly_fig)
_state.qlbm_max_time_step = len(output) - 1
_state.qlbm_time_val = 0
_state.qlbm_time_slider_labels = [f"T={t}" for t in result_T_list]
_state.qlbm_simulation_has_run = True
_state.qlbm_qiskit_mode = True
_state.qlbm_simulation_progress = 100
log_to_console("IBM QPU simulation completed successfully.")
_state.qlbm_status_message = "Simulation completed successfully."
_state.qlbm_status_type = "success"
_state.qlbm_show_progress = False
_qlbm_auto_hide_status_window(3.0) # Auto-hide after 3 seconds
await _qlbm_flush_async()
# === IonQ QPU Backend ===
elif use_ionq_qpu:
log_to_console("Using IonQ QPU backend...")
_state.qlbm_status_message = "Step 1: Preparing IonQ QPU job..."
_state.qlbm_simulation_progress = 0
await _qlbm_flush_async()
params = _map_state_to_qiskit_params()
if params is None:
raise RuntimeError("Failed to map state parameters")
# Create initial state circuit (part of Step 1)
log_to_console("Creating initial state circuit...")
_state.qlbm_simulation_progress = 2
await _qlbm_flush_async()
init_state_prep_circ = get_named_init_state_circuit(
n=params["n"],
init_state_name=params["init_state_name"],
sine_k_x=params["sine_k_x"],
sine_k_y=params["sine_k_y"],
sine_k_z=params["sine_k_z"],
gauss_cx=params["gauss_cx"],
gauss_cy=params["gauss_cy"],
gauss_cz=params["gauss_cz"],
gauss_sigma=params["gauss_sigma"],
mdd_kx_log2=params["mdd_kx_log2"],
mdd_ky_log2=params["mdd_ky_log2"],
mdd_kz_log2=params["mdd_kz_log2"],
)
_state.qlbm_simulation_progress = 5
_state.qlbm_status_message = "Step 1: Circuit generation..."
await _qlbm_flush_async()
# Run IonQ HW simulation in executor with progress callback
def _run_ionq_qpu_blocking():
job, get_result = run_sampling_hw_ionq(
n=params["n"],
ux=params["vx_expr"],
uy=params["vy_expr"],
uz=params["vz_expr"],
init_state_prep_circ=init_state_prep_circ,
T_list=params["T_list"],
shots=2**14,
vel_resolution=min(params['grid_size'], 32),
output_resolution=min(2*params['grid_size'], 40),
logger=log_to_console,
progress_callback=_qpu_progress_callback,
)
# get_result already handles progress updates internally
output, plotly_fig = get_result(job)
return output, plotly_fig, init_state_prep_circ
output, plotly_fig, init_state_prep_circ = await loop.run_in_executor(executor, _run_ionq_qpu_blocking)
# Step 3: Finalizing results (T=0 snapshot removed - only show T=1 onwards)
_state.qlbm_simulation_progress = 92
_state.qlbm_status_message = "Step 3: Finalizing results..."
await _qlbm_flush_async()
# Use T_list directly (no T=0 prepend)
result_T_list = list(params["T_list"])
log_to_console(f"Results available for T={result_T_list}")
# Store results
simulation_data_frames = output
simulation_times = [float(t) for t in result_T_list]
# Update UI
if hasattr(_ctrl, "qlbm_qiskit_result_update"):
_ctrl.qlbm_qiskit_result_update(plotly_fig)
_state.qlbm_max_time_step = len(output) - 1
_state.qlbm_time_val = 0
_state.qlbm_time_slider_labels = [f"T={t}" for t in result_T_list]
_state.qlbm_simulation_has_run = True
_state.qlbm_qiskit_mode = True
_state.qlbm_simulation_progress = 100
log_to_console("IonQ QPU simulation completed successfully.")
_state.qlbm_status_message = "Simulation completed successfully."
_state.qlbm_status_type = "success"
_state.qlbm_show_progress = False
_qlbm_auto_hide_status_window(3.0) # Auto-hide after 3 seconds
await _qlbm_flush_async()
# === CUDA-Q Backend ===
elif _state.qlbm_backend_type == "Simulator" and _state.qlbm_selected_simulator == "CUDA-Q simulator":
_state.qlbm_qiskit_mode = False # Use PyVista display
_state.qlbm_status_message = "Running CUDA-Q simulation..."
await _qlbm_flush_async()
grid_size = int(_state.qlbm_grid_size)
num_reg_qubits = int(math.log2(grid_size)) if grid_size > 0 else 3
T = int(_state.qlbm_time_steps)
distribution_type = _state.qlbm_dist_type
boundary_condition = _state.qlbm_boundary_condition
vx_func = make_velocity_func(_state.qlbm_vx_expr)
vy_func = make_velocity_func(_state.qlbm_vy_expr)
vz_func = make_velocity_func(_state.qlbm_vz_expr)
_state.qlbm_simulation_progress = 5
await _qlbm_flush_async()
if simulate_qlbm_3D_and_animate is not None:
log_to_console("Running CUDA-Q Simulation...")
# Run CUDA-Q simulation in executor
def _run_cudaq_blocking():
_plotter.clear()
return simulate_qlbm_3D_and_animate(
num_reg_qubits=num_reg_qubits,
T=T,
distribution_type=distribution_type,
vx_input=vx_func,
vy_input=vy_func,
vz_input=vz_func,
boundary_condition=boundary_condition,
plotter=_plotter,
add_slider=False,
progress_callback=_progress_callback
)
result = await loop.run_in_executor(executor, _run_cudaq_blocking)
_, frames, times, grid_obj = result
else:
# Fallback to CPU demo if CUDA-Q not available
log_to_console("CUDA-Q not available, falling back to CPU Demo...")
def _run_cpu_demo_blocking():
return _run_cpu_demo_simulation(
grid_size=grid_size,
T=T,
distribution_type=distribution_type or "Sinusoidal",
vx_func=vx_func,
vy_func=vy_func,
vz_func=vz_func,
progress_callback=_progress_callback
)
frames, times, grid_obj = await loop.run_in_executor(executor, _run_cpu_demo_blocking)
_state.qlbm_simulation_progress = 95
await _qlbm_flush_async()
# Update plotter with results
if grid_obj:
_plotter.clear()
isosurfaces = grid_obj.contour(isosurfaces=7, scalars="scalars")
_plotter.add_mesh(isosurfaces, cmap="turbo", opacity=0.3, show_scalar_bar=True)
_plotter.add_axes()
_plotter.show_grid()
# Store Results
if frames and len(frames) > 0:
simulation_data_frames = frames
simulation_times = times
current_grid_object = grid_obj
_state.qlbm_max_time_step = len(frames) - 1
_state.qlbm_time_val = 0
_state.qlbm_time_slider_labels = [f"{t:.1f}" for t in times] if times else [str(i) for i in range(len(frames))]
_state.qlbm_simulation_has_run = True
_ensure_point_picking(on_pick_point)
if hasattr(_ctrl, "qlbm_view_update"):
_ctrl.qlbm_view_update()
log_to_console("Simulation completed successfully.")
_state.qlbm_status_message = "Simulation completed successfully."
_state.qlbm_status_type = "success"
_state.qlbm_simulation_progress = 100
_state.qlbm_show_progress = False
_qlbm_auto_hide_status_window(3.0) # Auto-hide after 3 seconds
await _qlbm_flush_async()
else:
_state.qlbm_run_error = "Simulation produced no data."
log_to_console("Error: Simulation produced no data.")
_state.qlbm_status_message = "Error: No data produced"
_state.qlbm_status_type = "error"
await _qlbm_flush_async()
# === CPU Demo Backend (for QPU or fallback) ===
else:
_state.qlbm_qiskit_mode = False # Use PyVista display
_state.qlbm_status_message = "Running CPU Demo simulation..."
await _qlbm_flush_async()
grid_size = int(_state.qlbm_grid_size)
num_reg_qubits = int(math.log2(grid_size)) if grid_size > 0 else 3
T = int(_state.qlbm_time_steps)
distribution_type = _state.qlbm_dist_type
boundary_condition = _state.qlbm_boundary_condition
vx_func = make_velocity_func(_state.qlbm_vx_expr)
vy_func = make_velocity_func(_state.qlbm_vy_expr)
vz_func = make_velocity_func(_state.qlbm_vz_expr)
_progress_callback(0)
_state.qlbm_simulation_progress = 5
await _qlbm_flush_async()
# CPU Demo Simulation in executor
log_to_console("Running CPU Demo Simulation...")
def _run_cpu_demo_fallback():
return _run_cpu_demo_simulation(
grid_size=grid_size,
T=T,
distribution_type=distribution_type or "Sinusoidal",
vx_func=vx_func,
vy_func=vy_func,
vz_func=vz_func,
progress_callback=_progress_callback
)
frames, times, grid_obj = await loop.run_in_executor(executor, _run_cpu_demo_fallback)
_state.qlbm_simulation_progress = 95
await _qlbm_flush_async()
# Update plotter with results
if grid_obj:
_plotter.clear()
isosurfaces = grid_obj.contour(isosurfaces=7, scalars="scalars")
_plotter.add_mesh(isosurfaces, cmap="Blues", opacity=0.3, show_scalar_bar=True)
_plotter.add_axes()
_plotter.show_grid()
# Store Results
if frames and len(frames) > 0:
simulation_data_frames = frames
simulation_times = times
current_grid_object = grid_obj
_state.qlbm_max_time_step = len(frames) - 1
_state.qlbm_time_val = 0
_state.qlbm_time_slider_labels = [f"{t:.1f}" for t in times] if times else [str(i) for i in range(len(frames))]
_state.qlbm_simulation_has_run = True
_ensure_point_picking(on_pick_point)
if hasattr(_ctrl, "qlbm_view_update"):
_ctrl.qlbm_view_update()
log_to_console("Simulation completed successfully.")
_state.qlbm_status_message = "Simulation completed successfully."
_state.qlbm_status_type = "success"
_state.qlbm_simulation_progress = 100
_state.qlbm_show_progress = False
_qlbm_auto_hide_status_window(3.0) # Auto-hide after 3 seconds
await _qlbm_flush_async()
else:
_state.qlbm_run_error = "Simulation produced no data."
log_to_console("Error: Simulation produced no data.")
_state.qlbm_status_message = "Error: No data produced"
_state.qlbm_status_type = "error"
await _qlbm_flush_async()
except Exception as e:
_state.qlbm_run_error = f"Simulation failed: {str(e)}"
log_to_console(f"Simulation Error: {e}")
print(f"Simulation Error: {e}")
import traceback
traceback.print_exc()
_state.qlbm_status_message = "Simulation failed"
_state.qlbm_status_type = "error"
await _qlbm_flush_async()
finally:
_state.qlbm_is_running = False
_qlbm_stop_progress_heartbeat()
executor.shutdown(wait=False)
if _state.qlbm_status_type != "success":
_state.qlbm_show_progress = False
await _qlbm_flush_async()
def stop_simulation():
"""Stop the running simulation."""
if _state is None:
return
_state.qlbm_is_running = False
log_to_console("Simulation stopped by user")
def reset_simulation():
"""Reset the simulation state."""
global _plotter
if _state is None:
return
_state.qlbm_is_running = False
_state.qlbm_run_error = ""
_state.qlbm_simulation_has_run = False
_state.qlbm_qiskit_mode = False # Reset Qiskit mode
_state.qlbm_dist_type = None
_state.qlbm_show_edges = False
_state.qlbm_problems_selection = None
_state.qlbm_geometry_selection = None
_state.qlbm_backend_type = None
_state.qlbm_advecting_field = None
_state.qlbm_show_advect_params = False
if _plotter:
_plotter.clear()
if hasattr(_ctrl, "qlbm_view_update"):
_ctrl.qlbm_view_update()
_apply_workflow_highlights(_determine_workflow_step())
log_to_console("Simulation reset")
def _register_handlers():
"""Register state change handlers."""
@_state.change("qlbm_advecting_field")
def _on_advect_dropdown_change(qlbm_advecting_field, **_):
if qlbm_advecting_field:
set_velocity_preset(qlbm_advecting_field)
_apply_workflow_highlights(_determine_workflow_step())
@_state.change("qlbm_grid_index")
def _on_grid_index_change(qlbm_grid_index, **_):
"""Map discrete slider index to allowed grid sizes."""
try:
if qlbm_grid_index is None:
return
if isinstance(qlbm_grid_index, (int, float)):
idx = int(qlbm_grid_index)
idx = max(0, min(idx, len(GRID_SIZES) - 1))
val = GRID_SIZES[idx]
if _state.qlbm_grid_size != val:
_state.qlbm_grid_size = val
fig, info, warn = update_qubit_3D_info(val)
_state.qlbm_qubit_grid_info = info
_state.qlbm_qubit_warning = warn
if hasattr(_ctrl, "qlbm_qubit_plot_update"):
_ctrl.qlbm_qubit_plot_update(fig)
if _state.qlbm_nx != val:
_state.qlbm_nx = val
_state.qlbm_gauss_cx = val / 2
_state.qlbm_gauss_cy = val / 2
_state.qlbm_gauss_cz = val / 2
_state.qlbm_show_edges = True
update_view()
except Exception:
pass
finally:
_apply_workflow_highlights(_determine_workflow_step())
@_state.change("qlbm_problems_selection")
def _on_problem_selection_change(qlbm_problems_selection, **_):
"""Auto-select geometry based on the chosen problem."""
try:
if not qlbm_problems_selection:
_state.qlbm_geometry_selection = None
return
if isinstance(qlbm_problems_selection, str):
normalized = qlbm_problems_selection.strip()
_state.qlbm_geometry_selection = _PROBLEM_GEOMETRY_MAP.get(normalized)
else:
_state.qlbm_geometry_selection = None
except Exception:
_state.qlbm_geometry_selection = None
finally:
_apply_workflow_highlights(_determine_workflow_step())
@_state.change("qlbm_dist_type")
def _on_dist_type_change(qlbm_dist_type, **_):
if _state.qlbm_show_edges:
_state.qlbm_show_edges = False
update_view()
_apply_workflow_highlights(_determine_workflow_step())
@_state.change("qlbm_show_edges", "qlbm_sine_k_x", "qlbm_sine_k_y", "qlbm_sine_k_z",
"qlbm_gauss_cx", "qlbm_gauss_cy", "qlbm_gauss_cz", "qlbm_gauss_sigma",
"qlbm_mdd_kx_log2", "qlbm_mdd_ky_log2", "qlbm_mdd_kz_log2")
def on_param_change(**kwargs):
update_view()
_apply_workflow_highlights(_determine_workflow_step())
@_state.change("qlbm_geometry_selection", "qlbm_domain_L", "qlbm_domain_W", "qlbm_domain_H")
def _on_geometry_selection_change(**_):
update_geometry_view()
_apply_workflow_highlights(_determine_workflow_step())
@_state.change("qlbm_backend_type")
def _on_backend_type_change(**_):
_apply_workflow_highlights(_determine_workflow_step())
@_state.change("qlbm_time_val")
def update_time_frame(qlbm_time_val, **_):
"""Update the plotter with the frame corresponding to time_val."""
global simulation_data_frames, simulation_times, current_grid_object, _plotter
if not _state.qlbm_simulation_has_run or not simulation_data_frames or current_grid_object is None:
return
try:
idx = int(qlbm_time_val)
if 0 <= idx < len(simulation_data_frames):
current_grid_object["scalars"] = simulation_data_frames[idx].flatten()
isosurfaces = current_grid_object.contour(isosurfaces=7, scalars="scalars")
_plotter.clear()
_plotter.add_mesh(isosurfaces, cmap="Blues", opacity=0.3, show_scalar_bar=True)
_plotter.add_axes()
_plotter.show_grid()
t_val = simulation_times[idx] if idx < len(simulation_times) else idx
_state.qlbm_current_time_label = f"{t_val:.2f}" if isinstance(t_val, float) else str(t_val)
_plotter.add_text(f"Time: {t_val:.2f}" if isinstance(t_val, float) else f"Time: {t_val}",
name="time_label", position="upper_right")
_ensure_point_picking(on_pick_point)
if hasattr(_ctrl, "qlbm_view_update"):
_ctrl.qlbm_view_update()
except Exception as e:
print(f"Error updating time frame: {e}")
def _build_control_panels(plotter):
"""Build the left control panel cards."""
# Overview card
with vuetify3.VCard(classes="mb-2", style=("qlbm_overview_card_style", _WORKFLOW_BASE_STYLE)):
vuetify3.VCardTitle("Overview", classes="text-subtitle-2 font-weight-bold text-primary")
with vuetify3.VCardText():
vuetify3.VDivider(classes="my-2")
vuetify3.VCardSubtitle("Problems", classes="text-caption font-weight-bold mt-2")
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VSelect(
v_bind="props",
key="qlbm_overview_problems",
label="Select a problem",
v_model=("qlbm_problems_selection", None),
items=(
"qlbm_qlbm_problems",
[
"Scalar advection-diffusion in a box",
"Laminar flow & heat transfer for a heated body in water.",
],
),
placeholder="Select",
density="compact",
hide_details=True,
color="primary",
classes="mb-2"
)
html.Span("Select a predefined fluid dynamics problem to solve")
# Geometry card
with vuetify3.VCard(classes="mb-2"):
vuetify3.VCardTitle("Geometry", classes="text-subtitle-2 font-weight-bold text-primary")
with vuetify3.VCardText():
vuetify3.VAlert(
v_if="qlbm_geometry_selection",
type="info",
variant="tonal",
density="compact",
color="primary",
children=["Selected Geometry: ", "{{ qlbm_geometry_selection }}"],
classes="mb-2"
)
vuetify3.VAlert(
v_if="!qlbm_geometry_selection",
type="info",
variant="tonal",
density="compact",
color="primary",
children=["No geometry selected. Choose a problem to auto-set."],
classes="mb-2"
)
with vuetify3.VContainer(v_if="qlbm_geometry_selection === 'Rectangular domain with a heated box (3D)'", classes="pa-0 mt-2"):
vuetify3.VCardSubtitle("Domain dimensions", classes="text-caption font-weight-bold mb-2")
with vuetify3.VRow(dense=True):
with vuetify3.VCol():
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(v_bind="props", label="Length (L)", v_model=("qlbm_domain_L", 1.0), type="number", step="0.1", density="compact", hide_details=True, color="primary")
html.Span("Length of the domain along X axis")
with vuetify3.VCol():
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(v_bind="props", label="Width (W)", v_model=("qlbm_domain_W", 1.0), type="number", step="0.1", density="compact", hide_details=True, color="primary")
html.Span("Width of the domain along Y axis")
with vuetify3.VCol():
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(v_bind="props", label="Height (H)", v_model=("qlbm_domain_H", 1.0), type="number", step="0.1", density="compact", hide_details=True, color="primary")
html.Span("Height of the domain along Z axis")
# Initial Distribution card
with vuetify3.VCard(classes="mb-2", style=("qlbm_distribution_card_style", _WORKFLOW_BASE_STYLE)):
vuetify3.VCardTitle("Initial Distribution", classes="text-subtitle-2 font-weight-bold text-primary")
with vuetify3.VCardText():
with vuetify3.VRow(classes="d-flex align-center mb-2", no_gutters=True):
with vuetify3.VCol(cols="auto", classes="flex-grow-1"):
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VSelect(
v_bind="props",
label="Initial Distribution",
v_model=("qlbm_dist_type", None),
items=("qlbm_dist_modes",),
density="compact",
hide_details=True
)
html.Span("Select the initial density distribution function")
with vuetify3.VCol(cols="auto", classes="ml-2"):
with vuetify3.VBtn(
icon=True, density="compact", variant="text",
click="qlbm_custom_dist_params = !qlbm_custom_dist_params"
):
vuetify3.VIcon("mdi-cog", color=("qlbm_custom_dist_params ? 'primary' : 'grey'",))
# Sinusoidal controls
with vuetify3.VCard(classes="mb-2", v_if="qlbm_custom_dist_params && qlbm_dist_type === 'Sinusoidal'"):
vuetify3.VCardTitle("Sinusoidal Frequencies")
with vuetify3.VCardText():
for axis in ['x', 'y', 'z']:
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VSlider(
v_bind="props",
label=f"Freq {axis.upper()}",
v_model=(f"qlbm_sine_k_{axis}", 1.0),
min=1, max=5, step=1,
thumb_label="always", density="compact"
)
html.Span(f"Frequency multiplier for {axis.upper()} axis")
# Gaussian controls
with vuetify3.VCard(classes="mb-2", v_if="qlbm_custom_dist_params && qlbm_dist_type === 'Gaussian'"):
vuetify3.VCardTitle("Gaussian Parameters")
with vuetify3.VCardText():
for axis in ['x', 'y', 'z']:
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VSlider(
v_bind="props",
label=f"Center {axis.upper()}",
v_model=(f"qlbm_gauss_c{axis}", 16),
min=0, max=("qlbm_nx", 32), step=1,
thumb_label="always", density="compact"
)
html.Span(f"Center position along {axis.upper()} axis")
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VSlider(
v_bind="props",
label="Width (Sigma)",
v_model=("qlbm_gauss_sigma", 6.0),
min=1.0, max=20.0, step=0.5,
thumb_label="always", density="compact"
)
html.Span("Standard deviation (spread) of the Gaussian")
# Multi-Dirac-Delta controls
with vuetify3.VCard(classes="mb-2", v_if="qlbm_custom_dist_params && qlbm_dist_type === 'Multi-Dirac-Delta'"):
vuetify3.VCardTitle("Multi-Dirac-Delta Parameters")
with vuetify3.VCardText():
vuetify3.VCardSubtitle("Number of delta peaks per axis = 2^k", classes="text-caption mb-2")
for axis in ['x', 'y', 'z']:
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VSlider(
v_bind="props",
label=f"k_{axis.upper()} (log₂)",
v_model=(f"qlbm_mdd_k{axis}_log2", 1),
min=1, max=4, step=1,
thumb_label="always", density="compact"
)
html.Span(f"Log2 of number of peaks along {axis.upper()}")
# Boundary Conditions
with vuetify3.VCard(classes="mb-2"):
vuetify3.VCardTitle("Boundary Conditions", classes="text-subtitle-2 font-weight-bold text-primary")
with vuetify3.VCardText():
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VSelect(v_bind="props", label="Boundary Condition", v_model=("qlbm_boundary_condition", "Periodic"),
items=("['Periodic']",), density="compact", hide_details=True, color="primary")
html.Span("Select boundary conditions for the simulation domain")
# Advecting Fields
with vuetify3.VCard(classes="mb-2", style=("qlbm_advect_card_style", _WORKFLOW_BASE_STYLE)):
vuetify3.VCardTitle("Advecting Fields", classes="text-subtitle-2 font-weight-bold text-primary")
with vuetify3.VCardText():
with vuetify3.VRow(classes="d-flex align-center mb-2", no_gutters=True):
with vuetify3.VCol(cols="auto", classes="flex-grow-1"):
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VSelect(
v_bind="props",
label="Select Advecting field",
v_model=("qlbm_advecting_field", None),
items=("['Uniform', 'Swirl', 'Shear', 'TGV']",),
density="compact",
hide_details=True,
color="primary",
placeholder="Select",
)
html.Span("Select the velocity field that transports the fluid")
with vuetify3.VCol(cols="auto", classes="ml-2"):
with vuetify3.VBtn(
icon=True, density="compact", variant="text",
click="qlbm_show_advect_params = !qlbm_show_advect_params"
):
vuetify3.VIcon("mdi-cog", color=("qlbm_show_advect_params ? 'primary' : 'grey'",))
with vuetify3.VContainer(v_if="qlbm_show_advect_params", classes="pa-0 mt-2"):
html.Div("Velocity Components", classes="text-caption mb-1")
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(v_bind="props", label="Velocity vx", v_model=("qlbm_vx_expr", "0.2"), density="compact", hide_details=True, color="primary", classes="mb-1")
html.Span("X-component of velocity field")
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(v_bind="props", label="Velocity vy", v_model=("qlbm_vy_expr", "-0.15"), density="compact", hide_details=True, color="primary", classes="mb-1")
html.Span("Y-component of velocity field")
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(v_bind="props", label="Velocity vz", v_model=("qlbm_vz_expr", "0.3"), density="compact", hide_details=True, color="primary")
html.Span("Z-component of velocity field")
# Meshing
with vuetify3.VCard(classes="mb-2", style=("qlbm_meshing_card_style", _WORKFLOW_BASE_STYLE)):
vuetify3.VCardTitle("Meshing", classes="text-subtitle-2 font-weight-bold text-primary")
with vuetify3.VCardText():
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=False, location="end"):
with vuetify3.Template(v_slot_activator="{ props }"):
with vuetify3.VSlider(
v_bind="props",
label="Number of Points / Direction",
v_model=("qlbm_grid_index", 2),
min=0, max=5, step=1,
thumb_label="always",
show_ticks="always",
color="primary",
density="compact",
hide_details=True
):
vuetify3.Template(v_slot_thumb_label="{ modelValue }", children=["{{ ['8','16','32','64','128','256'][modelValue] }}"])
with vuetify3.VSheet(classes="pa-2", elevation=6, rounded=True, style="width: 700px;"):
with vuetify3.VContainer(fluid=True, classes="pa-0"):
qubit_fig = plotly_widgets.Figure(figure=go.Figure(), style="width: 616px; height: 320px; min-height: 320px;", responsive=True)
_ctrl.qlbm_qubit_plot_update = qubit_fig.update
html.Div("{{ qlbm_qubit_grid_info }}", classes="mt-2 text-caption")
html.Div("{{ qlbm_qubit_warning }}", classes="warn-text")
vuetify3.VAlert(v_if="qlbm_grid_size > 32", type="warning", variant="tonal", density="compact",
children=["Warning: High grid size may impact performance."], classes="mt-2")
# Time
with vuetify3.VCard(classes="mb-2"):
vuetify3.VCardTitle("Time", classes="text-subtitle-2 font-weight-bold text-primary")
with vuetify3.VCardText():
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VSlider(v_bind="props", label="Total Time", v_model=("qlbm_time_steps", 10), min=0, max=30, step=1,
thumb_label="always", show_ticks="always", color="primary", density="compact", hide_details=True)
html.Span("Number of time steps to simulate")
vuetify3.VAlert(v_if="qlbm_time_steps > 100", type="warning", variant="tonal", density="compact",
children=["Warning: High time steps may increase runtime."], classes="mt-2")
# Backends
with vuetify3.VCard(classes="mb-2", style=("qlbm_backend_card_style", _WORKFLOW_BASE_STYLE)):
vuetify3.VCardTitle("Backends", classes="text-subtitle-2 font-weight-bold text-primary")
with vuetify3.VCardText():
with vuetify3.VRow(dense=True, classes="mb-2"):
with vuetify3.VCol():
vuetify3.VAlert(
type="info",
color="primary",
variant="tonal",
density="compact",
children=[
"Selected: ",
"{{ qlbm_backend_type || 'None - Please select a backend' }}",
" - ",
"{{ qlbm_backend_type === 'Simulator' ? (qlbm_selected_simulator || '—') : (qlbm_backend_type === 'QPU' ? (qlbm_selected_qpu || '—') : '—') }}",
],
)
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VBtn(v_bind="props", text="Choose Backend", color="primary", variant="tonal", block=True)
with vuetify3.VList(density="compact"):
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end", offset=8):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VListItem(v_bind="props", title="Simulator", prepend_icon="mdi-robot-outline", append_icon="mdi-chevron-right")
with vuetify3.VList(density="compact"):
vuetify3.VListItem(title="CUDA-Q simulator", click="qlbm_backend_type = 'Simulator'; qlbm_selected_simulator = 'CUDA-Q simulator'")
vuetify3.VListItem(title="IBM Qiskit simulator", click="qlbm_backend_type = 'Simulator'; qlbm_selected_simulator = 'IBM Qiskit simulator'")
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end", offset=8):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VListItem(v_bind="props", title="QPU", prepend_icon="mdi-chip", append_icon="mdi-chevron-right")
with vuetify3.VList(density="compact"):
vuetify3.VListItem(title="IBM QPU", click="qlbm_backend_type = 'QPU'; qlbm_selected_qpu = 'IBM QPU'")
vuetify3.VListItem(title="IonQ QPU", click="qlbm_backend_type = 'QPU'; qlbm_selected_qpu = 'IonQ QPU'")
# IBM QPU Warning for grid > 16
vuetify3.VAlert(
v_if="qlbm_backend_type === 'QPU' && qlbm_selected_qpu === 'IBM QPU' && qlbm_grid_size > 16",
type="warning",
variant="tonal",
density="compact",
children=["⚠️ Grid size > 16 may exceed IBM QPU capacity!"],
classes="mt-2"
)
# Sinusoidal Warning for IBM QPU
vuetify3.VAlert(
v_if="qlbm_backend_type === 'QPU' && qlbm_selected_qpu === 'IBM QPU' && qlbm_dist_type === 'Sinusoidal'",
type="warning",
variant="tonal",
density="compact",
children=["⚠️ Sinusoidal distribution results in very high circuit depth on IBM QPU!"],
classes="mt-2"
)
# IonQ Restriction Warning
vuetify3.VAlert(
v_if="qlbm_backend_type === 'QPU' && qlbm_selected_qpu === 'IonQ QPU' && qlbm_dist_type !== 'Multi-Dirac-Delta'",
type="error",
variant="tonal",
density="compact",
children=["⚠️ IonQ QPU only supports Multi-Dirac-Delta distribution."],
classes="mt-2"
)
vuetify3.VDivider(classes="my-3")
vuetify3.VBtn(
text="Run",
color="primary",
block=True,
disabled=("qlbm_is_running || !qlbm_backend_type || (qlbm_backend_type === 'QPU' && qlbm_selected_qpu === 'IonQ QPU' && qlbm_dist_type !== 'Multi-Dirac-Delta')", True),
click=run_simulation,
style=("qlbm_is_running ? '' : 'background-color:#87CEFA;'", ""),
)
# Backend mode and notes hidden as per user request
# html.Div("Backend: {{ qlbm_simulation_backend_mode }}", classes="text-caption text-medium-emphasis mt-2")
# vuetify3.VAlert(
# v_if="qlbm_simulation_backend_note",
# type="info",
# variant="tonal",
# density="compact",
# children=["{{ qlbm_simulation_backend_note }}"],
# classes="mt-2",
# )
with vuetify3.VRow(dense=True, classes="mt-2"):
with vuetify3.VCol(cols=6):
vuetify3.VBtn(
text="Reset",
color="#8BC34A",
variant="tonal",
block=True,
disabled=("qlbm_is_running", False),
click=reset_simulation,
)
with vuetify3.VCol(cols=6):
vuetify3.VBtn(
text="STOP",
color="#FF7043",
variant="tonal",
block=True,
click=stop_simulation,
disabled=("!qlbm_is_running", True),
)
# --- Job Result Upload Section ---
vuetify3.VDivider(classes="my-3")
html.Div("Upload Results", classes="text-subtitle-2 font-weight-bold text-primary mb-2")
html.Div("Retrieve completed job results from IBM or IonQ using the Job ID",
classes="text-caption text-medium-emphasis mb-2")
# Platform selector
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VSelect(
v_bind="props",
label="Platform",
v_model=("qlbm_job_platform", "IonQ"),
items=("['IBM', 'IonQ']",),
density="compact",
hide_details=True,
color="primary",
classes="mb-2",
prepend_icon="mdi-chip",
)
html.Span("Select the quantum hardware provider (IBM or IonQ)")
# Job ID input
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(
v_bind="props",
label="Job ID",
v_model=("qlbm_job_id", ""),
density="compact",
hide_details=True,
color="primary",
classes="mb-2",
placeholder="e.g., 019b368e-6e22-7525-8512-fd16e0503673",
prepend_icon="mdi-identifier",
)
html.Span("Enter the Job ID (UUID format from IBM or IonQ)")
# Output resolution and Total Time in a row
with vuetify3.VRow(dense=True, classes="mb-2"):
with vuetify3.VCol(cols=6):
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(
v_bind="props",
label="Total Time",
v_model=("qlbm_job_total_time", 3),
type="number",
density="compact",
hide_details=True,
color="primary",
)
html.Span("Total number of time steps (T) used when running the job")
with vuetify3.VCol(cols=6):
with vuetify3.VTooltip(location="top"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(
v_bind="props",
label="Output Resolution",
v_model=("qlbm_job_output_resolution", 40),
type="number",
density="compact",
hide_details=True,
color="primary",
)
html.Span("Resolution for 3D visualization. Should be <= Grid Size (2^n).")
# Generate button
vuetify3.VBtn(
text="Retrieve & Generate Plot",
color="secondary",
variant="tonal",
block=True,
disabled=("!qlbm_job_id || qlbm_job_is_processing", True),
loading=("qlbm_job_is_processing", False),
click=process_uploaded_job_result,
prepend_icon="mdi-chart-box-outline",
classes="mb-2",
)
# Success message
vuetify3.VAlert(
v_if="qlbm_job_upload_success",
type="success",
variant="tonal",
density="compact",
closable=True,
children=["{{ qlbm_job_upload_success }}"],
classes="mt-2",
)
# Error message
vuetify3.VAlert(
v_if="qlbm_job_upload_error",
type="error",
variant="tonal",
density="compact",
closable=True,
children=["{{ qlbm_job_upload_error }}"],
classes="mt-2",
)
def _build_visualization_panel(plotter):
"""Build the right visualization panel."""
# Main Plot Card
with vuetify3.VCard(classes="mb-1 flex-grow-1 d-flex flex-column", elevation=2, style="min-height: 0;"):
# Geometry Preview (Plotly) - when no simulation and no distribution selected
with vuetify3.VContainer(v_if="!qlbm_simulation_has_run && !qlbm_dist_type && qlbm_geometry_selection",
fluid=True, classes="pa-0 flex-grow-1", style="width: 100%; height: 100%;"):
geom_fig = plotly_widgets.Figure(figure=go.Figure(), style="width: 100%; height: 100%;", responsive=True)
_ctrl.qlbm_geometry_plot_update = geom_fig.update
# Distribution Preview (Plotly) - when distribution selected but no simulation
with vuetify3.VContainer(v_if="!qlbm_simulation_has_run && qlbm_dist_type",
fluid=True, classes="pa-0 flex-grow-1", style="width: 100%; height: 100%;"):
preview_fig = plotly_widgets.Figure(figure=go.Figure(), style="width:100%; height:100%;", responsive=True)
_ctrl.qlbm_preview_update = preview_fig.update
# Download controls (for both modes)
with vuetify3.VContainer(v_if="qlbm_simulation_has_run", classes="px-4 pt-3 pb-1 d-flex justify-end",
style="width: 100%; flex: 0 0 auto;"):
with vuetify3.VMenu(location="bottom end"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VBtn(
v_bind="props",
text="Download",
color="primary",
variant="tonal",
prepend_icon="mdi-download"
)
with vuetify3.VList(density="compact"):
# VTK and MP4 exports only for non-Qiskit mode
vuetify3.VListItem(
v_if="!qlbm_qiskit_mode",
title="Export as VTK",
prepend_icon="mdi-content-save",
click=export_simulation_vtk
)
vuetify3.VListItem(
v_if="!qlbm_qiskit_mode",
title="Export as MP4",
prepend_icon="mdi-movie",
click=export_simulation_mp4
)
# TODO: Add Plotly HTML export for Qiskit mode
vuetify3.VListItem(
v_if="qlbm_qiskit_mode",
title="Export as HTML (Plotly)",
prepend_icon="mdi-language-html5",
disabled=True, # Not yet implemented
)
# === Qiskit Simulation Result (Plotly with built-in slider) ===
with vuetify3.VContainer(v_if="qlbm_simulation_has_run && qlbm_qiskit_mode",
fluid=True, classes="pa-0 flex-grow-1",
style="width: 100%; height: 100%;"):
qiskit_fig = plotly_widgets.Figure(
figure=go.Figure(),
style="width:100%; height:100%;",
responsive=True
)
_ctrl.qlbm_qiskit_result_update = qiskit_fig.update
# === PyVista Simulation Result (for CUDA-Q/CPU demo) ===
with vuetify3.VContainer(v_if="qlbm_simulation_has_run && !qlbm_qiskit_mode",
fluid=True, classes="pa-0 flex-grow-1",
style="width: 100%; height: 100%;"):
view = plotter_ui(plotter)
_ctrl.qlbm_view_update = view.update
# Time Slider (only for non-Qiskit mode - Qiskit Plotly has built-in slider)
with vuetify3.VContainer(v_if="qlbm_simulation_has_run && !qlbm_qiskit_mode",
classes="px-4 pb-4", style="width: 90%; flex: 0 0 auto;"):
with vuetify3.VSlider(
v_model=("qlbm_time_val", 0),
min=0,
max=("qlbm_max_time_step", 10),
step=1,
label="Time",
thumb_label="always",
density="compact",
hide_details=True,
color="primary"
):
vuetify3.Template(
v_slot_thumb_label="{ modelValue }",
children=["{{ qlbm_time_slider_labels[modelValue] || modelValue }}"]
)
# Console Window
with vuetify3.VCard(classes="mt-1", style="font-size: 0.8rem; flex: 0 0 auto;"):
vuetify3.VCardTitle("Status", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;")
with vuetify3.VCardText(classes="py-1 px-2", style="height: 150px; overflow-y: auto; background-color: #f5f5f5; font-family: monospace;"):
vuetify3.VTextarea(
v_model=("qlbm_console_output", ""),
readonly=True,
auto_grow=False,
rows=6,
variant="plain",
hide_details=True,
style="font-family: monospace; width: 100%; height: 100%;"
)
def _build_status_window():
"""Build the floating status window."""
with vuetify3.VCard(
v_if="qlbm_status_visible",
style="position: fixed; bottom: 16px; right: 16px; z-index: 1000; min-width: 320px; max-width: 450px;",
elevation=8
):
with vuetify3.VCardTitle(classes="d-flex align-center", style="font-size: 0.95rem; padding: 8px 12px;"):
vuetify3.VIcon("mdi-information-outline", size="small", classes="mr-2")
html.Span("Simulation Status")
vuetify3.VSpacer()
vuetify3.VBtn(
icon="mdi-close",
size="x-small",
variant="text",
click="qlbm_status_visible = false"
)
vuetify3.VDivider()
with vuetify3.VCardText(classes="py-2 px-3"):
vuetify3.VAlert(
type=("qlbm_status_type", "info"),
variant="tonal",
density="compact",
children=["{{ qlbm_status_message }}"]
)
with vuetify3.VContainer(v_if="qlbm_show_progress", classes="pa-0 mt-2"):
vuetify3.VProgressLinear(
model_value=("qlbm_simulation_progress", 0),
color="primary",
height=6,
striped=True
)
html.Div(
"{{ qlbm_simulation_progress }}% complete",
classes="text-caption text-center mt-1",
style="font-size: 0.75rem;"
)