quantum / em_embedded.py
harishaseebat92
Refactored the EM backend selection by (1) rebranding the former IBM QPU path into a dedicated Statevector Estimator simulator and (2) rewiring the backend plumbing so the heavy lifting now happens inside
c131b0b
raw
history blame
168 kB
import numpy as np
import re
import pyvista as pv
import threading
import base64
from collections import defaultdict
import tempfile
from pathlib import Path
from trame_vuetify.widgets import vuetify3
from pyvista.trame.ui import plotter_ui
import plotly.graph_objects as go
import plotly.io as pio
from trame_plotly.widgets import plotly as plotly_widgets
import os
from datetime import datetime
import time
from trame.widgets import html as trame_html
EXCITATION_SURFACE_COLORSCALE = [
[0.0, "#001219"],
[0.25, "#005F73"],
[0.5, "#94D2BD"],
[0.75, "#EE9B00"],
[1.0, "#CA6702"],
]
try:
# Prefer package import when running as a module: python -m quantum.em_trame
from quantum.utils.delta_impulse_generator import *
import quantum.utils.delta_impulse_generator as qutils
except ModuleNotFoundError:
# Fallback when running this file directly: python quantum/em_trame.py
from utils.delta_impulse_generator import *
import utils.delta_impulse_generator as qutils
# from utils.base_functions import *
# Set PyVista to use off-screen rendering for Trame
pv.OFF_SCREEN = True
# ============================================================================
# DEFERRED STATE/CONTROLLER PROXY CLASSES
# These allow using state.update(), @state.change(), and ctrl.xxx at module
# load time, then applying them when set_server() is called.
# ============================================================================
class _DeferredStateProxy:
"""
A proxy that collects state defaults and change decorators at module load time,
then applies them to the real server.state when bind() is called.
"""
def __init__(self):
self._state = None
self._defaults = {}
self._pending_changes = [] # list of (keys, func)
def bind(self, real_state):
"""Bind to the real state object and apply all pending operations."""
self._state = real_state
# Apply all collected defaults
if self._defaults:
real_state.update(self._defaults)
self._defaults.clear()
# Apply all pending @state.change decorators
for keys, func in self._pending_changes:
real_state.change(*keys)(func)
self._pending_changes.clear()
@property
def bound(self):
return self._state is not None
def update(self, d):
"""Collect defaults; apply immediately if bound."""
if self._state is not None:
self._state.update(d)
else:
self._defaults.update(d)
def change(self, *keys):
"""
Decorator factory that mimics @state.change("key1", "key2").
If already bound, apply immediately. Otherwise, queue for later.
"""
def decorator(func):
if self._state is not None:
# Already bound - register directly
self._state.change(*keys)(func)
else:
# Queue for later
self._pending_changes.append((keys, func))
return func
return decorator
def __getattr__(self, name):
if name.startswith("_"):
raise AttributeError(name)
if self._state is None:
raise AttributeError(f"State not bound yet; cannot access '{name}'")
return getattr(self._state, name)
def __setattr__(self, name, value):
if name.startswith("_"):
object.__setattr__(self, name, value)
elif self._state is not None:
setattr(self._state, name, value)
else:
# Store as a default
self._defaults[name] = value
class _DeferredControllerProxy:
"""
A proxy that collects controller attribute assignments at module load time,
then applies them when bind() is called.
"""
def __init__(self):
self._ctrl = None
self._pending = {}
def bind(self, real_ctrl):
"""Bind to the real controller and apply pending attributes."""
self._ctrl = real_ctrl
for name, value in self._pending.items():
setattr(real_ctrl, name, value)
self._pending.clear()
@property
def bound(self):
return self._ctrl is not None
def __getattr__(self, name):
if name.startswith("_"):
raise AttributeError(name)
if self._ctrl is None:
raise AttributeError(f"Controller not bound yet; cannot access '{name}'")
return getattr(self._ctrl, name)
def __setattr__(self, name, value):
if name.startswith("_"):
object.__setattr__(self, name, value)
elif self._ctrl is not None:
setattr(self._ctrl, name, value)
else:
self._pending[name] = value
# Module-level proxies (will be bound when set_server is called)
_server = None
state = _DeferredStateProxy()
ctrl = _DeferredControllerProxy()
def _noop(*_, **__):
pass
ctrl.qpu_ts_update = _noop
ctrl.qpu_ts_other_update = _noop
ctrl.view_update = _noop
ctrl.sim_ts_update = _noop
ctrl.geometry_preview_update = _noop
ctrl.excitation_preview_update = _noop
ctrl.qubit_plot_update = _noop
def set_server(server):
"""Bind the embedded EM module to the shared Trame server."""
global _server
_server = server
state.bind(server.state)
ctrl.bind(server.controller)
def init_state(force: bool = False):
"""Ensure EM defaults are applied; optionally force a reset."""
if not state.bound:
return
if force:
reset_to_defaults()
# Initialize preview after state is bound
try:
update_initial_state_preview()
except Exception:
pass # Ignore if preview can't be initialized yet
# Cache for QPU Plotly export/selection (Python-side)
qpu_ts_cache = {
"times": None,
"series_map": None,
"field": None,
"fig": None,
"positions_by_field": {"All": []},
"key_to_label": {},
"label_to_keys": {},
"nx": None,
}
# --- Application State ---
state.update({
"problem_selection": None,
"geometry_options": ["None", "Square Metallic Body", "Square Domain", "Geometry 2", "Add"],
"dist_type": None, "impulse_x": 0.5, "impulse_y": 0.5,
"peak_pair": "(0.5, 0.5)",
"mu_x": 0.5, "mu_y": 0.5, "sigma_x": 0.25, "sigma_y": 0.15,
"mu_pair": "(0.5, 0.5)", # Gaussian Mu as normalized pair string
"sigma_pair": "(0.25, 0.15)", # Gaussian Sigma as normalized pair string
"nx": None, "T": 1.0, "time_val": 0.0,
"L": 1.0, # Side length used for coordinate conversion
"output_type": "Surface Plot",
"surface_field": "Ez",
"timeseries_field": "Ez",
"timeseries_points": "(0.5, 0.5)",
"timeseries_gridpoints": "",
"timeseries_point_info": "",
"error_message": "",
"is_running": False,
"simulation_has_run": False,
"geometry_selection": None,
"show_upload_dialog": False,
"uploaded_file_info": None,
"show_upload_status": False,
"upload_status_message": "",
"coeff_permittivity": 1.0, # Relative permittivity (ε_r)
"coeff_permeability": 1.0, # Relative permeability (μ_r)
"run_button_text": "RUN!",
"backend_type": None,
"selected_simulator": "IBM Qiskit simulator",
"selected_qpu": "IBM QPU",
"stop_button_disabled": True, # Stop button is initially disabled
"export_format": "vtk", # Dummy export format for Surface Plot
"nx_slider_index": None, # No selection until user chooses on the slider
"show_export_status": False,
"export_status_message": "",
"logo_src": None,
# Dummy geometry-hole controls (UI only)
"hole_size_edge": 0.2, # edge length in [0, 1]
"hole_center_x": 0.5, # center X in (0, 1)
"hole_center_y": 0.5, # center Y in (0, 1)
"hole_center_pair": "(0.5, 0.5)", # bracket-format center for dropdown
"hole_error_message": "", # validation message
"excitation_error_message": "", # parse/validation for Gaussian pair inputs
"excitation_info_message": "", # Snapping info for excitation coordinates
"excitation_config_open": False,
"dt_user": 0.1, # Dummy Δt input from user (seconds)
"temporal_warning": "", # Warning message for invalid Δt
"qpu_field_components": "Ez", # Dummy field component for QPU
"qpu_monitor_gridpoints": "", # Derived gridpoints for QPU monitors
"qpu_monitor_samples": "(0.5, 0.5)", # User-facing normalized sample input
"qpu_monitor_sample_info": "",
"console_output": "Console initialized.\n",
# Square aspect for initial preview
"pyvista_view_style": "aspect-ratio: 1 / 1; width: 100%;",
"qpu_ts_fig": None,
"qpu_ts_selected_time": None,
"qpu_ts_ready": False,
# Hide Plotly widget until data; enlarge Y using a taller min-height
"qpu_plot_style": "display: none; width: 900px; height: 660px; margin: 0 auto;",
# Second Plotly chart for other components
"qpu_ts_other_ready": False,
"qpu_other_plot_style": "display: none; width: 900px; height: 660px; margin: 0 auto;",
# NEW: multiple QPU monitor configurations (list of { field, points })
"qpu_monitor_configs": [], # unused when using primitives
# NEW: dropdown filter for which component to plot
"qpu_plot_filter": "All",
"qpu_plot_field_options": ["All"],
"qpu_plot_position_filter": "All positions",
"qpu_plot_position_options": ["All positions"],
# Primitive slots for additional monitors (2..5)
"qpu_monitor_count": 0, # number of additional monitors enabled
"qpu_field_components_2": "Ez",
"qpu_monitor_gridpoints_2": "",
"qpu_monitor_samples_2": "",
"qpu_monitor_sample_info_2": "",
"qpu_field_components_3": "Ez",
"qpu_monitor_gridpoints_3": "",
"qpu_monitor_samples_3": "",
"qpu_monitor_sample_info_3": "",
"qpu_field_components_4": "Ez",
"qpu_monitor_gridpoints_4": "",
"qpu_monitor_samples_4": "",
"qpu_monitor_sample_info_4": "",
"qpu_field_components_5": "Ez",
"qpu_monitor_gridpoints_5": "",
"qpu_monitor_samples_5": "",
"qpu_monitor_sample_info_5": "",
# Status window
"status_visible": False,
"status_message": "Ready",
"status_type": "info", # info, success, warning, error
"simulation_progress": 0,
"show_progress": False,
"console_logs": "Console initialized...\n",
})
# Ensure hole snap state exists
state.hole_snap = True
_WORKFLOW_CARD_KEYS = [
"overview_card_style",
"geometry_card_style",
"excitation_card_style",
"meshing_card_style",
"backend_card_style",
"output_card_style",
]
def _workflow_highlight_style(active: bool) -> str:
base = "font-size: 0.8rem; border: 1px solid transparent; transition: box-shadow 0.2s ease;"
accent = "border: 2px solid #5F259F; box-shadow: 0 0 12px rgba(95,37,159,0.45); background-color: rgba(95,37,159,0.02);"
return f"{base} {accent}" if active else base
def _determine_workflow_step() -> int:
if not state.problem_selection:
return 0
if not state.geometry_selection:
return 1
if not state.dist_type:
return 2
if state.nx is None:
return 3
if not state.backend_type:
return 4
return 5
def _apply_workflow_highlights(step_index: int):
state.workflow_step = step_index
for idx, key in enumerate(_WORKFLOW_CARD_KEYS):
setattr(state, key, _workflow_highlight_style(idx == step_index))
_apply_workflow_highlights(0)
# --- Load Synopsys logo (from quantum folder) as data URI ---
def load_logo_data_uri():
base_dir = os.path.dirname(__file__)
candidates = [
os.path.join(base_dir, "ansys-part-of-synopsys-logo.svg"),
os.path.join(base_dir, "synopsys-logo-color-rgb.svg"),
os.path.join(base_dir, "synopsys-logo-color-rgb.png"),
os.path.join(base_dir, "synopsys-logo-color-rgb.jpg"),
]
for p in candidates:
if os.path.exists(p):
ext = os.path.splitext(p)[1].lower()
mime = "image/svg+xml" if ext == ".svg" else ("image/png" if ext == ".png" else "image/jpeg")
with open(p, "rb") as f:
b64 = base64.b64encode(f.read()).decode("ascii")
return f"data:{mime};base64,{b64}"
return None
state.logo_src = load_logo_data_uri()
# --- Global PyVista and Data Variables ---
plotter = pv.Plotter()
simulation_data = None
current_mesh = None
data_frames = None
z_scale = 1.0
X_grids, Y_grids = {}, {}
surface_clims = {}
stop_simulation = False # Flag to stop running simulation
snapshot_times = None # Times corresponding to saved frames (user snapshots)
# Stable id generator for QPU monitor config rows
qpu_cfg_id_counter = 1
def _new_monitor_cfg(field: str = "Ez", points: str = "(8, 8)") -> dict:
"""Create a new monitor config with a unique id for proper v-for keying."""
global qpu_cfg_id_counter
qpu_cfg_id_counter += 1
return {"id": qpu_cfg_id_counter, "field": field, "points": points}
# --- Constants ---
GRID_SIZES = ["16", "32", "64", "128", "256", "512"]
DEFAULT_AXIS_TICKS = (0.0, 0.25, 0.5, 0.75, 1.0)
# --- Plotting and Simulation Logic ---
# New: Build a PyVista time series chart for QPU results (same look as simulator)
# Build Plotly time series for QPU and helpers
import plotly.graph_objects as go # ensure present
# NEW: multi-config Plotly builder for QPU
from matplotlib import cm as _mpl_cm
# Helper to choose cmap per field (Ez→Reds, Hx→Greens, Hy/default→Blues)
def _cmap_for_field(field: str):
f = str(field)
if f == 'Ez':
return _mpl_cm.Reds
if f == 'Hx':
return _mpl_cm.Greens
return _mpl_cm.Blues
def _normalized_position_label(px: int, py: int, gw: int, gh: int) -> str:
px_i, py_i = int(px), int(py)
denom_x = float(max(gw - 1, 1))
denom_y = float(max(gh - 1, 1))
x_norm = px_i / denom_x
y_norm = py_i / denom_y
return f"Position ({x_norm:.3f}, {y_norm:.3f})"
def _format_grid_label(px: int, py: int, field: str | None = None) -> str:
px_i, py_i = int(px), int(py)
label_map = qpu_ts_cache.get("key_to_label") or {}
if field:
label = label_map.get((str(field), px_i, py_i))
if label:
return label
for (fld, gx, gy), label in label_map.items():
if gx == px_i and gy == py_i:
return label
nx_val = getattr(state, "nx", None)
if nx_val:
denom = float(max(int(nx_val) - 1, 1))
return f"Position ({px_i / denom:.3f}, {py_i / denom:.3f})"
return f"Position ({px_i}, {py_i})"
def _update_qpu_position_options(current_field: str = "All"):
try:
field_key = (current_field or "All").strip() or "All"
pos_map = qpu_ts_cache.get("positions_by_field") or {}
entries = pos_map.get(field_key) or pos_map.get("All") or []
labels = []
for entry in entries:
label = None
if isinstance(entry, dict):
label = entry.get("label")
if not label:
coords = entry.get("coords") or (None, None)
fld = entry.get("field")
label = _format_grid_label(coords[0], coords[1], fld)
elif isinstance(entry, (list, tuple)) and len(entry) >= 2:
lbl_field = entry[2] if len(entry) >= 3 else (field_key if field_key not in ("", "All") else None)
label = _format_grid_label(entry[0], entry[1], lbl_field)
if label:
labels.append(label)
# Preserve order while removing duplicates
labels = list(dict.fromkeys(labels))
options = ["All positions"] + labels if labels else ["All positions"]
state.qpu_plot_position_options = options
if state.qpu_plot_position_filter not in options:
state.qpu_plot_position_filter = options[0]
except Exception:
pass
def _filter_series_keys(series_map, field_filter: str, position_filter: str):
keys = list(series_map.keys())
ff = (field_filter or "All").strip()
pf = (position_filter or "All positions").strip()
if ff not in ("", "All"):
keys = [k for k in keys if str(k[0]) == ff]
if pf not in ("", "All", "All positions"):
label_map = qpu_ts_cache.get("label_to_keys") or {}
label_keys = label_map.get(pf)
if not label_keys:
return []
allowed = {(str(fld), int(px), int(py)) for (fld, px, py) in label_keys}
keys = [k for k in keys if (str(k[0]), int(k[1]), int(k[2])) in allowed]
return keys
def _refresh_qpu_plot_figures():
try:
field_filter = (state.qpu_plot_filter or "All").strip()
except Exception:
field_filter = "All"
try:
position_filter = (state.qpu_plot_position_filter or "All positions").strip()
except Exception:
position_filter = "All positions"
fig_all = qpu_ts_cache.get("fig")
times = qpu_ts_cache.get("times") or []
series_map = qpu_ts_cache.get("series_map") or {}
if fig_all is None or not times or not series_map:
return
_update_qpu_position_options(field_filter)
fig_primary = _rebuild_qpu_fig_filtered(field_filter, position_filter)
if fig_primary is None:
fig_primary = fig_all
if fig_primary is not None:
try:
ctrl.qpu_ts_update(fig_primary)
except Exception:
pass
state.qpu_ts_ready = True
state.qpu_plot_style = "width: 900px; height: 660px; margin: 0 auto;"
else:
state.qpu_ts_ready = False
state.qpu_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;"
if field_filter not in ("", "All") and position_filter in ("", "All", "All positions"):
fig_oth = _rebuild_qpu_fig_others(field_filter, position_filter)
if fig_oth is not None and getattr(fig_oth, "data", None):
try:
ctrl.qpu_ts_other_update(fig_oth)
except Exception:
pass
state.qpu_ts_other_ready = True
state.qpu_other_plot_style = "width: 900px; height: 660px; margin: 0 auto;"
else:
state.qpu_ts_other_ready = False
state.qpu_other_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;"
else:
state.qpu_ts_other_ready = False
state.qpu_other_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;"
def _build_qpu_timeseries_plotly_multi(configs, nx: int, T: float, snapshot_dt: float, impulse_pos, progress_callback=None, print_callback=None):
times = qutils.create_time_frames(T, snapshot_dt)
fig = go.Figure()
# Gather all validated positions across configs (after expanding 'All') to compute color normalization
all_triplets = [] # (field, px, py)
cfg_expanded = [] # list of (field, [(px,py), ...])
for cfg in (configs or []):
field_type = (cfg.get("field") or "Ez").strip()
pts_str = str(cfg.get("points") or "").strip()
# Expand 'All' into Ez, Hx, Hy
fields = ('Ez', 'Hx', 'Hy') if field_type == 'All' else (field_type,)
# Parse positions string once
raw_pts = [tuple(map(int, m)) for m in re.findall(r"\((\d+)\s*,\s*(\d+)\)", pts_str)] or [impulse_pos]
for f in fields:
if f == 'Ez':
gw, gh = nx, nx
elif f == 'Hx':
gw, gh = nx, nx - 1
else:
gw, gh = nx - 1, nx
valid = []
for (px, py) in raw_pts:
if 0 <= px < gw and 0 <= py < gh:
valid.append((int(px), int(py)))
if not valid:
continue
cfg_expanded.append((f, valid))
all_triplets.extend((f, px, py) for (px, py) in valid)
# Normalize color by max(px+py) across all selected points
max_sum = max(((px + py) for (_, px, py) in all_triplets), default=1)
if max_sum <= 0:
max_sum = 1
series_map = {}
positions_by_field = defaultdict(dict)
key_to_label = {}
label_to_keys = defaultdict(set)
max_abs = 0.0
dashes = ["solid", "dash", "dot", "dashdot"]
markers = ["circle", "square", "diamond", "triangle-up", "x"]
total_configs = len(cfg_expanded)
for idx, (field_type, valid_positions) in enumerate(cfg_expanded):
# Create a sub-callback for this config's portion of progress
def _sub_progress(p):
if progress_callback:
# Map p (0-100) to the slice for this config
base = (idx / total_configs) * 100
fraction = (1 / total_configs) * 100
total_p = base + (p / 100.0) * fraction
progress_callback(total_p)
# Fetch time series from QPU for this field and the validated positions
try:
series_map_field = qutils.run_sve(field_type, valid_positions, None, float(T), float(snapshot_dt), int(nx), None, impulse_pos, progress_callback=_sub_progress, print_callback=print_callback)
except Exception as e:
msg = f"QPU error for {field_type} positions {valid_positions}: {e}"
if print_callback:
print_callback(msg)
else:
print(msg)
continue
cmap = _cmap_for_field(field_type)
num_pts = len(valid_positions)
for i, (px, py) in enumerate(valid_positions):
ys = (series_map_field or {}).get((px, py), [])
if not ys or len(ys) != len(times):
continue
series_map[(field_type, px, py)] = list(ys)
try:
max_abs = max(max_abs, max((abs(float(v)) for v in ys)))
except Exception:
pass
# Color keyed by index to ensure distinctness
if num_pts > 1:
# Distribute from 0.3 (light) to 0.9 (dark)
s_index = i / (num_pts - 1)
s_light = 0.3 + 0.6 * s_index
else:
s_light = 0.6 # Medium intensity for single point
rgba = cmap(s_light)
color_hex = f"#{int(rgba[0]*255):02x}{int(rgba[1]*255):02x}{int(rgba[2]*255):02x}"
label = _normalized_position_label(px, py, gw, gh)
key = (str(field_type), int(px), int(py))
key_to_label[key] = label
label_to_keys[label].add(key)
positions_by_field[str(field_type)][(int(px), int(py))] = {
"coords": (int(px), int(py)),
"label": label,
"field": str(field_type),
}
fig.add_trace(
go.Scatter(
x=times,
y=ys,
mode='lines+markers',
name=label,
line=dict(color=color_hex, width=2.5, dash=dashes[i % len(dashes)]),
marker=dict(size=7, symbol=markers[i % len(markers)], color=color_hex),
hovertemplate=f"{field_type} | t=%{{x:.3f}}s<br>Value=%{{y:.6g}}<extra>{label}</extra>",
)
)
# Layout: fixed size, doubled label fonts, refined grid/lines
unique_fields = sorted({f for (f, _, _) in series_map.keys()})
fig.update_layout(
title=f"Time Series ({', '.join(unique_fields) if unique_fields else '—'})",
height=660, width=900,
margin=dict(l=50, r=30, t=50, b=50),
hovermode="x unified",
legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1, title_text=""),
paper_bgcolor="#FFFFFF",
plot_bgcolor="#FFFFFF",
)
fig.update_xaxes(title_text="Time (s)", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)", zeroline=False, showline=True, linewidth=1, linecolor="rgba(0,0,0,.3)", showspikes=True, spikemode='across', spikesnap='cursor')
fig.update_yaxes(title_text="Field Value", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)", zeroline=True, zerolinecolor="rgba(0,0,0,.25)", showline=True, linewidth=1, linecolor="rgba(0,0,0,.3)")
if max_abs > 0:
pad = 0.12 * max_abs
fig.update_yaxes(range=[-max_abs - pad, max_abs + pad])
# Cache and field options
qpu_ts_cache["times"] = list(times)
qpu_ts_cache["series_map"] = series_map
qpu_ts_cache["field"] = ",".join(unique_fields) if len(unique_fields) == 1 else ("multi" if unique_fields else "")
qpu_ts_cache["fig"] = fig
qpu_ts_cache["unique_fields"] = list(unique_fields)
try:
positions_map_sorted = {}
all_entries = {}
for field_name, entry_map in positions_by_field.items():
entries = [entry_map[key] for key in sorted(entry_map.keys(), key=lambda xy: (xy[0], xy[1]))]
positions_map_sorted[field_name] = entries
for entry in entries:
all_entries.setdefault(entry["label"], entry)
positions_map_sorted["All"] = sorted(all_entries.values(), key=lambda entry: (entry["coords"][0], entry["coords"][1]))
qpu_ts_cache["positions_by_field"] = positions_map_sorted
qpu_ts_cache["key_to_label"] = key_to_label
qpu_ts_cache["label_to_keys"] = {lbl: sorted(list(vals)) for lbl, vals in label_to_keys.items()}
qpu_ts_cache["nx"] = int(nx)
except Exception:
qpu_ts_cache["positions_by_field"] = {"All": []}
qpu_ts_cache["key_to_label"] = {}
qpu_ts_cache["label_to_keys"] = {}
try:
state.qpu_plot_field_options = ["All"] + list(unique_fields)
state.qpu_plot_filter = "All"
_update_qpu_position_options("All")
except Exception:
pass
return fig
def _rebuild_qpu_fig_filtered(filter_value: str, position_filter: str = "All positions"):
try:
fv = (filter_value or "All").strip()
pf = (position_filter or "All positions").strip()
fig_all = qpu_ts_cache.get("fig")
times = qpu_ts_cache.get("times") or []
series_map = qpu_ts_cache.get("series_map") or {}
if fig_all is None or not times or not series_map:
return fig_all
use_base = fv in ("", "All") and pf in ("", "All", "All positions")
if use_base:
return fig_all
keys = _filter_series_keys(series_map, fv, pf)
if not keys:
return None
import plotly.graph_objects as go
max_sum = max(((k[1] + k[2]) for k in keys), default=1)
if max_sum <= 0:
max_sum = 1
fig = go.Figure()
dashes = ["solid", "dash", "dot", "dashdot"]
markers = ["circle", "square", "diamond", "triangle-up", "x"]
max_abs = 0.0
label_map = qpu_ts_cache.get("key_to_label") or {}
sorted_keys = sorted(keys, key=lambda x: (str(x[0]), x[1], x[2]))
num_keys = len(sorted_keys)
for i, k in enumerate(sorted_keys):
field_name, px, py = k
ys = series_map.get(k) or []
if not ys or len(ys) != len(times):
continue
try:
max_abs = max(max_abs, max((abs(float(v)) for v in ys)))
except Exception:
pass
cmap = _cmap_for_field(field_name)
# Color keyed by index to ensure distinctness
if num_keys > 1:
s_index = i / (num_keys - 1)
s_light = 0.3 + 0.6 * s_index
else:
s_light = 0.6
rgba = cmap(s_light)
color_hex = f"#{int(rgba[0]*255):02x}{int(rgba[1]*255):02x}{int(rgba[2]*255):02x}"
label = label_map.get((str(field_name), int(px), int(py))) or _format_grid_label(px, py, field_name)
fig.add_trace(go.Scatter(
x=times,
y=ys,
mode='lines+markers',
name=label,
line=dict(color=color_hex, width=2.5, dash=dashes[i % len(dashes)]),
marker=dict(size=7, symbol=markers[i % len(markers)], color=color_hex),
hovertemplate=f"{field_name} | t=%{{x:.3f}}s<br>Value=%{{y:.6g}}<extra>{label}</extra>",
))
title_parts = []
if fv not in ("", "All"):
title_parts.append(fv)
if pf not in ("", "All", "All positions"):
title_parts.append(pf)
suffix = " - ".join(title_parts) if title_parts else "Filtered"
fig.update_layout(title=f"IBM QPU Time Series ({suffix})", height=660, width=900, margin=dict(l=50, r=30, t=50, b=50), hovermode="x unified", legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1, title_text=""))
fig.update_xaxes(title_text="Time (s)", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)", zeroline=False, showline=True, linewidth=1, linecolor="rgba(0,0,0,.3)")
fig.update_yaxes(title_text="Field Value", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)", zeroline=True, zerolinecolor="rgba(0,0,0,.25)", showline=True, linewidth=1, linecolor="rgba(0,0,0,.3)")
if max_abs > 0:
pad = 0.12 * max_abs
fig.update_yaxes(range=[-max_abs - pad, max_abs + pad])
return fig
except Exception:
return qpu_ts_cache.get("fig")
# Plotly click selection and CSV export
def on_qpu_ts_click(evt):
try:
if not evt or "points" not in evt or not evt["points"]:
return
x = float(evt["points"][0].get("x"))
times = qpu_ts_cache.get("times") or []
fig = qpu_ts_cache.get("fig")
if not times or fig is None:
return
import numpy as _np
idx = int(_np.argmin(_np.abs(_np.asarray(times) - x)))
sel_t = float(times[idx])
# Update selection line
fig.update_layout(shapes=[dict(type="line", x0=sel_t, x1=sel_t, y0=0, y1=1, xref="x", yref="paper", line=dict(color="#5F259F", width=2, dash="dot"))])
qpu_ts_cache["fig"] = fig
try:
ctrl.qpu_ts_update(fig)
except Exception:
pass
state.qpu_ts_selected_time = sel_t
except Exception:
pass
def on_qpu_ts_clear():
try:
fig = qpu_ts_cache.get("fig")
if fig is None:
return
fig.update_layout(shapes=[])
qpu_ts_cache["fig"] = fig
try:
ctrl.qpu_ts_update(fig)
except Exception:
pass
state.qpu_ts_selected_time = None
except Exception:
pass
ctrl.on_qpu_ts_click = on_qpu_ts_click
ctrl.on_qpu_ts_clear = on_qpu_ts_clear
@state.change("problem_selection")
def on_problem_change(problem_selection, **kwargs):
if problem_selection == "Propagation in a given medium (no bodies)":
state.geometry_options = ["None", "Square Domain", "Geometry 2", "Add"]
elif problem_selection == "Scattering from a perfectly conducting body":
state.geometry_options = ["None", "Square Metallic Body", "Geometry 2", "Add"]
else:
state.geometry_options = ["None", "Square Metallic Body", "Square Domain", "Geometry 2", "Add"]
_apply_workflow_highlights(_determine_workflow_step())
# Reset geometry selection when options change to avoid invalid state
if state.geometry_selection not in state.geometry_options:
state.geometry_selection = None
def export_qpu_timeseries_csv():
try:
times = qpu_ts_cache.get("times")
series_map = qpu_ts_cache.get("series_map")
field = qpu_ts_cache.get("field") or "Ez"
if not times or not series_map:
raise ValueError("No QPU time series to export.")
nx_val = int(state.nx or 0)
from datetime import datetime as _dt
filename = f"qpu_timeseries_{field}_nx{nx_val}_{_dt.now().strftime('%Y%m%d_%H%M%S')}.csv"
import csv
with tempfile.NamedTemporaryFile(mode='w', newline='', suffix=".csv", delete=False) as tmp:
writer = csv.writer(tmp)
# Detect key layout: (px,py) legacy or (field,px,py)
keys = list(series_map.keys())
if keys and len(keys[0]) == 3:
keys.sort(key=lambda k: (k[0], k[1], k[2]))
writer.writerow(["time_s"] + [f"{k[0]}({k[1]},{k[2]})" for k in keys])
for i, t in enumerate(times):
row = [t] + [ (series_map.get(k) or [None])[i] if i < len(series_map.get(k) or []) else None for k in keys ]
writer.writerow(row)
else:
# Legacy single-field
pts = [(k[0], k[1]) for k in keys]
pts.sort(key=lambda p: (p[0], p[1]))
writer.writerow(["time_s"] + [f"({px},{py})" for (px, py) in pts])
for i, t in enumerate(times):
row = [t] + [ (series_map.get((px,py)) or [None])[i] if i < len(series_map.get((px,py)) or []) else None for (px,py) in pts ]
writer.writerow(row)
temp_path = tmp.name
content = Path(temp_path).read_bytes()
# Encode content as base64 for browser download
content_b64 = base64.b64encode(content).decode("ascii")
_server.js_call("utils", "download", filename, f"data:text/csv;base64,{content_b64}")
Path(temp_path).unlink()
state.export_status_message = f"Exported QPU CSV to {filename}"
except Exception as e:
state.export_status_message = f"QPU CSV export failed: {e}"
finally:
state.show_export_status = True
# NEW: Export QPU Plotly figure as PNG (requires kaleido)
def export_qpu_timeseries_png():
try:
fig = qpu_ts_cache.get("fig")
field = qpu_ts_cache.get("field") or "Ez"
if fig is None:
raise ValueError("No QPU time series to export.")
nx_val = int(state.nx or 0)
filename = f"qpu_timeseries_{field}_nx{nx_val}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
temp_path = tmp.name
fig.write_image(temp_path, scale=2, width=900, height=660)
content = Path(temp_path).read_bytes()
# Encode content as base64 for browser download
content_b64 = base64.b64encode(content).decode("ascii")
_server.js_call("utils", "download", filename, f"data:image/png;base64,{content_b64}")
Path(temp_path).unlink()
state.export_status_message = f"Exported QPU PNG to {filename}"
except Exception as e:
msg = str(e)
if "kaleido" in msg.lower():
state.export_status_message = "QPU PNG export failed: kaleido is not installed. Run `pip install -U kaleido`."
else:
state.export_status_message = f"QPU PNG export failed: {e}"
finally:
state.show_export_status = True
# NEW: Export QPU Plotly figure as standalone HTML
def export_qpu_timeseries_html():
try:
fig = qpu_ts_cache.get("fig")
field = qpu_ts_cache.get("field") or "Ez"
if fig is None:
raise ValueError("No QPU time series to export.")
nx_val = int(state.nx or 0)
filename = f"qpu_timeseries_{field}_nx{nx_val}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as tmp:
temp_path = tmp.name
pio.write_html(fig, file=temp_path, include_plotlyjs="cdn", full_html=True)
content = Path(temp_path).read_bytes()
# Encode content as base64 for browser download
content_b64 = base64.b64encode(content).decode("ascii")
_server.js_call("utils", "download", filename, f"data:text/html;base64,{content_b64}")
Path(temp_path).unlink()
state.export_status_message = f"Exported QPU HTML to {filename}"
except Exception as e:
state.export_status_message = f"QPU HTML export failed: {e}"
finally:
state.show_export_status = True
# --- Cache for Simulator Time Series (for export) ---
sim_ts_cache = {
"fig": None,
"field": None,
}
# --- Export functions for Simulator Time Series ---
def export_sim_timeseries_csv():
"""Export simulator time series data to CSV."""
try:
global simulation_data, snapshot_times
if simulation_data is None or snapshot_times is None:
raise ValueError("No simulator time series to export.")
field = state.timeseries_field or "Ez"
nx_val = int(state.nx or 0)
filename = f"sim_timeseries_{field}_nx{nx_val}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
# Parse positions from state
positions = []
pts_str = state.timeseries_points or "(0.5, 0.5)"
for m in _SAMPLE_PAIR_RE.finditer(pts_str):
x_norm, y_norm = float(m.group(1)), float(m.group(2))
px = int(round(x_norm * (nx_val - 1)))
py = int(round(y_norm * (nx_val - 1)))
positions.append((px, py))
if not positions:
positions = [(nx_val // 2, nx_val // 2)]
import csv
with tempfile.NamedTemporaryFile(mode='w', newline='', suffix=".csv", delete=False) as tmp:
writer = csv.writer(tmp)
header = ["time_s"] + [f"({px},{py})" for (px, py) in positions]
writer.writerow(header)
times = np.asarray(snapshot_times)
n_frames = simulation_data.shape[0]
for i, t in enumerate(times):
if i >= n_frames:
break
row = [t]
for (px, py) in positions:
if field == 'Ez':
val = simulation_data[i, py * nx_val + px]
elif field == 'Hx':
gw, gh = nx_val, nx_val - 1
if 0 <= px < gw and 0 <= py < gh:
block = simulation_data[i, 2*nx_val*nx_val : 3*nx_val*nx_val-nx_val].reshape(gh, gw)
val = block[py, px]
else:
val = 0.0
else: # Hy
mask = np.arange(1, nx_val * nx_val + 1) % nx_val != 0
raw_block = simulation_data[i, -nx_val*nx_val:]
gw, gh = nx_val - 1, nx_val
if 0 <= px < gw and 0 <= py < gh:
val = raw_block[mask].reshape(nx_val, nx_val - 1)[py, px]
else:
val = 0.0
row.append(val)
writer.writerow(row)
temp_path = tmp.name
content = Path(temp_path).read_bytes()
content_b64 = base64.b64encode(content).decode("ascii")
_server.js_call("utils", "download", filename, f"data:text/csv;base64,{content_b64}")
Path(temp_path).unlink()
state.export_status_message = f"Exported Simulator CSV to {filename}"
except Exception as e:
state.export_status_message = f"Simulator CSV export failed: {e}"
finally:
state.show_export_status = True
def export_sim_timeseries_png():
"""Export simulator time series Plotly figure as PNG."""
try:
fig = sim_ts_cache.get("fig")
field = state.timeseries_field or "Ez"
if fig is None:
raise ValueError("No simulator time series to export.")
nx_val = int(state.nx or 0)
filename = f"sim_timeseries_{field}_nx{nx_val}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
temp_path = tmp.name
fig.write_image(temp_path, scale=2, width=900, height=660)
content = Path(temp_path).read_bytes()
content_b64 = base64.b64encode(content).decode("ascii")
_server.js_call("utils", "download", filename, f"data:image/png;base64,{content_b64}")
Path(temp_path).unlink()
state.export_status_message = f"Exported Simulator PNG to {filename}"
except Exception as e:
msg = str(e)
if "kaleido" in msg.lower():
state.export_status_message = "Simulator PNG export failed: kaleido is not installed. Run `pip install -U kaleido`."
else:
state.export_status_message = f"Simulator PNG export failed: {e}"
finally:
state.show_export_status = True
def export_sim_timeseries_html():
"""Export simulator time series Plotly figure as standalone HTML."""
try:
fig = sim_ts_cache.get("fig")
field = state.timeseries_field or "Ez"
if fig is None:
raise ValueError("No simulator time series to export.")
nx_val = int(state.nx or 0)
filename = f"sim_timeseries_{field}_nx{nx_val}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as tmp:
temp_path = tmp.name
pio.write_html(fig, file=temp_path, include_plotlyjs="cdn", full_html=True)
content = Path(temp_path).read_bytes()
content_b64 = base64.b64encode(content).decode("ascii")
_server.js_call("utils", "download", filename, f"data:text/html;base64,{content_b64}")
Path(temp_path).unlink()
state.export_status_message = f"Exported Simulator HTML to {filename}"
except Exception as e:
state.export_status_message = f"Simulator HTML export failed: {e}"
finally:
state.show_export_status = True
def setup_surface_plot_data(simulation_data, nx):
global data_frames, z_scale, X_grids, Y_grids, surface_clims
mask = np.arange(1, nx * nx + 1) % nx != 0
data_frames = {'Ez': [], 'Hx': [], 'Hy': []}
surface_clims = {'Ez': [np.inf, -np.inf], 'Hx': [np.inf, -np.inf], 'Hy': [np.inf, -np.inf]}
for u in simulation_data:
ez, hx, hy = u[:nx*nx].reshape(nx,nx), u[2*nx*nx:3*nx*nx-nx].reshape(nx-1,nx), u[-nx*nx:][mask].reshape(nx,nx-1)
data_frames['Ez'].append(ez); data_frames['Hx'].append(hx); data_frames['Hy'].append(hy)
if ez.size > 0: surface_clims['Ez'][0], surface_clims['Ez'][1] = min(surface_clims['Ez'][0], ez.min()), max(surface_clims['Ez'][1], ez.max())
if hx.size > 0: surface_clims['Hx'][0], surface_clims['Hx'][1] = min(surface_clims['Hx'][0], hx.min()), max(surface_clims['Hx'][1], hx.max())
if hy.size > 0: surface_clims['Hy'][0], surface_clims['Hy'][1] = min(surface_clims['Hy'][0], hy.min()), max(surface_clims['Hy'][1], hy.max())
for key in surface_clims:
if surface_clims[key][0] == surface_clims[key][1]: surface_clims[key][0] -= 1e-9; surface_clims[key][1] += 1e-9
# Revert to integer grid coordinates (like app.py) to keep output plots in grids
x, y, x_m1, y_m1 = np.arange(nx), np.arange(nx), np.arange(nx-1), np.arange(nx-1)
X_grids['Ez'], Y_grids['Ez'] = np.meshgrid(x, y)
X_grids['Hx'], Y_grids['Hx'] = np.meshgrid(x, y_m1)
X_grids['Hy'], Y_grids['Hy'] = np.meshgrid(x_m1, y)
# Fix: compute max_abs across finite values in surface_clims
finite_vals = [abs(float(v)) for pair in surface_clims.values() for v in pair if np.isfinite(v)]
max_abs = max(finite_vals) if finite_vals else 1e-9
z_scale = (nx / 2) / max(max_abs, 1e-9)
def run_simulation_only():
global simulation_data, current_mesh, snapshot_times, stop_simulation
# Require selections before running
if not state.geometry_selection:
state.error_message = "Please select a geometry before running the simulation."
log_to_console("Error: Please select a geometry before running.")
state.status_visible = True
state.status_message = "Error: Please select a geometry before running."
state.status_type = "error"
state.show_progress = False
state.is_running = False
state.run_button_text = "RUN!"
return
if not state.dist_type:
state.error_message = "Please select an initial state before running the simulation."
log_to_console("Error: Please select an initial state before running.")
state.status_visible = True
state.status_message = "Error: Please select an initial state before running."
state.status_type = "error"
state.show_progress = False
state.is_running = False
state.run_button_text = "RUN!"
return
# Show status: Starting simulation
state.status_visible = True
state.status_message = "Initializing simulation..."
log_to_console("Initializing simulation...")
state.status_type = "info"
state.show_progress = True
state.simulation_progress = 0
last_logged_percent = 0
def _progress_callback(percent):
nonlocal last_logged_percent
state.simulation_progress = percent
# Log every 10%
if percent - last_logged_percent >= 10:
log_to_console(f"Simulation progress: {int(percent)}%")
last_logged_percent = percent
# Reset stop flag and enable Stop button at start
stop_simulation = False
state.stop_button_disabled = False
plotter.clear()
current_mesh = None
state.error_message = ""
state.is_running = True
state.simulation_has_run = False
state.run_button_text = "Running"
ctrl.view_update()
nx, T = int(state.nx), float(state.T)
na, R = 1, 4
try:
state.status_message = "Creating initial state..."
state.simulation_progress = 10
if state.dist_type == "Delta":
initial_state = create_impulse_state_from_pos((nx, nx), (float(state.impulse_x), float(state.impulse_y)))
else:
initial_state = create_gaussian_state_from_pos((nx, nx), (float(state.mu_x), float(state.mu_y)), (float(state.sigma_x), float(state.sigma_y)))
except ValueError as e:
state.error_message = f"Initial State Error: {e}"
state.status_message = f"Error: {e}"
state.status_type = "error"
state.show_progress = False
state.is_running = False
state.run_button_text = "RUN!"
state.stop_button_disabled = True
return
# If QPU selected, build QPU time series chart and return
if state.backend_type == "QPU":
try:
print("Running QPU...")
state.status_message = "Running QPU simulation..."
state.simulation_progress = 20
state.qpu_ts_ready = False
state.qpu_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;"
state.qpu_ts_other_ready = False
state.qpu_other_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;"
# Inputs for QPU
snapshot_dt = float(state.dt_user)
# Impulse index for QPU helper
ix_imp, iy_imp = _nearest_node_index(float(state.impulse_x), float(state.impulse_y), nx)
impulse_pos = (ix_imp, iy_imp)
# Build configs from primitive slots: primary + additional slots (2..1+count)
configs = [{
"field": (state.qpu_field_components or "Ez"),
"points": (state.qpu_monitor_gridpoints or ""),
}]
try:
cnt = int(state.qpu_monitor_count or 0)
except Exception:
cnt = 0
for slot_num in range(2, 2 + cnt):
f = getattr(state, f"qpu_field_components_{slot_num}", "Ez") or "Ez"
p = getattr(state, f"qpu_monitor_gridpoints_{slot_num}", "") or ""
configs.append({"field": f, "points": p})
state.status_message = "Building QPU time series..."
state.simulation_progress = 60
# Build and render Plotly chart (multi-config)
fig = _build_qpu_timeseries_plotly_multi(configs, nx, T, snapshot_dt, impulse_pos, progress_callback=_progress_callback, print_callback=log_to_console)
qpu_ts_cache["fig"] = fig
try:
ctrl.qpu_ts_update(fig)
except Exception:
pass
state.simulation_has_run = True
state.run_button_text = "Successful!"
state.simulation_progress = 100
state.status_message = "QPU simulation completed successfully!"
log_to_console("Simulation Completed")
state.status_type = "success"
state.show_progress = False
# Show chart only if it has data
ready = bool(getattr(fig, "data", None)) and len(fig.data) > 0
state.qpu_ts_ready = ready
state.qpu_plot_style = (
"width: 900px; height: 660px; margin: 0 auto;"
if ready else "display: none; width: 900px; height: 660px; margin: 0 auto;"
)
# Secondary stays hidden after run; it will appear when a specific component is selected
state.qpu_ts_other_ready = False
state.qpu_other_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;"
if not ready:
state.error_message = "No QPU time series generated. Check Δt, T, nx, and monitor points."
state.status_message = "Warning: No QPU time series generated."
state.status_type = "warning"
log_to_console("QPU complete.")
except Exception as e:
state.error_message = f"QPU run failed: {e}"
state.status_message = f"QPU Error: {e}"
state.status_type = "error"
state.show_progress = False
state.run_button_text = "RUN!"
state.qpu_ts_ready = False
log_to_console(f"QPU error: {e}")
finally:
state.is_running = False
state.stop_button_disabled = True
ctrl.view_update()
return
log_to_console("Running simulation...")
state.status_message = "Running simulation... This may take a while."
state.simulation_progress = 30
# Pass user-defined snapshot Δt; keep solver dt=0.1 inside run_sim
snapshot_dt = float(state.dt_user)
def _stop_check():
return stop_simulation
state.simulation_progress = 50
simulation_data, snapshot_times = run_sim(nx, na, R, initial_state, T, snapshot_dt=snapshot_dt, stop_check=_stop_check, progress_callback=_progress_callback, print_callback=log_to_console)
log_to_console("Simulation complete.")
state.simulation_progress = 80
state.status_message = "Processing simulation results..."
if simulation_data.size > 0:
setup_surface_plot_data(simulation_data, nx)
state.simulation_has_run = True
state.run_button_text = "Successful!"
state.simulation_progress = 100
state.status_message = "Simulation completed successfully!"
state.status_type = "success"
state.show_progress = False
# Allow the view to use full area after run (remove strict square)
generate_plot()
else:
state.error_message = "Simulation produced no data. Check parameters (e.g., T > 0)."
state.status_message = "Error: Simulation produced no data."
state.status_type = "error"
state.show_progress = False
state.run_button_text = "RUN!"
state.is_running = False
state.stop_button_disabled = True
def reset_to_defaults():
"""Reset all parameters to their default values"""
global simulation_data, current_mesh, data_frames, stop_simulation, snapshot_times
# Stop any running simulation
stop_simulation = True
# Reset global variables
simulation_data = None
current_mesh = None
data_frames = None
snapshot_times = None
# Reset state to default values
state.update({
"dist_type": None,
"impulse_x": 0.5,
"impulse_y": 0.5,
"peak_pair": "(0.5, 0.5)",
"mu_x": 0.5,
"mu_y": 0.5,
"sigma_x": 0.25,
"sigma_y": 0.15,
"mu_pair": "(0.5, 0.5)",
"sigma_pair": "(0.25, 0.15)",
"nx": None,
"T": 10.0,
"time_val": 0.0,
"output_type": "Surface Plot",
"surface_field": "Ez",
"timeseries_field": "Ez",
"timeseries_points": "(0.5, 0.5)",
"timeseries_gridpoints": "",
"timeseries_point_info": "",
"error_message": "",
"excitation_info_message": "",
"excitation_config_open": False,
"is_running": False,
"simulation_has_run": False,
"geometry_selection": None,
"coeff_permittivity": 1.0,
"coeff_permeability": 1.0,
"run_button_text": "RUN!",
"backend_type": None,
"selected_simulator": "IBM Qiskit simulator",
"selected_qpu": "IBM QPU",
"stop_button_disabled": True,
"export_format": "vtk",
"nx_slider_index": None,
"dt_user": 0.1,
"temporal_warning": "",
"qpu_field_components": "Ez",
"qpu_monitor_gridpoints": "",
"qpu_monitor_samples": "(0.5, 0.5)",
"qpu_monitor_sample_info": "",
"qpu_monitor_count": 0,
"qpu_plot_filter": "All",
"qpu_plot_field_options": ["All"],
"qpu_plot_position_filter": "All positions",
"qpu_plot_position_options": ["All positions"],
"qpu_ts_ready": False,
"qpu_plot_style": "display: none; width: 900px; height: 660px; margin: 0 auto;",
"qpu_ts_other_ready": False,
"qpu_other_plot_style": "display: none; width: 900px; height: 660px; margin: 0 auto;",
# Restore square aspect for initial preview
"pyvista_view_style": "aspect-ratio: 1 / 1; width: 100%;",
"qpu_field_components_2": "Ez",
"qpu_monitor_gridpoints_2": "",
"qpu_monitor_samples_2": "",
"qpu_monitor_sample_info_2": "",
"qpu_field_components_3": "Ez",
"qpu_monitor_gridpoints_3": "",
"qpu_monitor_samples_3": "",
"qpu_monitor_sample_info_3": "",
"qpu_field_components_4": "Ez",
"qpu_monitor_gridpoints_4": "",
"qpu_monitor_samples_4": "",
"qpu_monitor_sample_info_4": "",
"qpu_field_components_5": "Ez",
"qpu_monitor_gridpoints_5": "",
"qpu_monitor_samples_5": "",
"qpu_monitor_sample_info_5": "",
# Workflow guidance styling
"workflow_step": 0,
"overview_card_style": "font-size: 0.8rem; border: 2px solid #5F259F; box-shadow: 0 0 12px rgba(95,37,159,0.4); transition: box-shadow 0.2s ease;",
"geometry_card_style": "font-size: 0.8rem; border: 1px solid transparent; transition: box-shadow 0.2s ease;",
"excitation_card_style": "font-size: 0.8rem; border: 1px solid transparent; transition: box-shadow 0.2s ease;",
"meshing_card_style": "font-size: 0.8rem; border: 1px solid transparent; transition: box-shadow 0.2s ease;",
"backend_card_style": "font-size: 0.8rem; border: 1px solid transparent; transition: box-shadow 0.2s ease;",
"output_card_style": "font-size: 0.8rem; border: 1px solid transparent; transition: box-shadow 0.2s ease;",
})
qpu_ts_cache.update({
"times": None,
"series_map": None,
"field": None,
"fig": None,
"positions_by_field": {"All": []},
"key_to_label": {},
"label_to_keys": {},
"nx": None,
})
# Ensure stop flag is cleared for next run
stop_simulation = False
_refresh_all_qpu_sample_slots()
_update_sim_monitor_points()
_apply_workflow_highlights(0)
# Update the preview with default values
update_initial_state_preview()
print("Reset to default settings")
@state.change("peak_pair")
def sync_peak_pair(peak_pair, **kwargs):
"""Parse normalized pair (x, y) in [0,1] for Peak and update impulse_x/impulse_y."""
try:
m = re.match(r"\(\s*([0-9]*\.?[0-9]+)\s*,\s*([0-9]*\.?[0-9]+)\s*\)", str(peak_pair))
if not m:
raise ValueError("Invalid format")
x = max(0.0, min(1.0, float(m.group(1))))
y = max(0.0, min(1.0, float(m.group(2))))
state.impulse_x = x
state.impulse_y = y
state.excitation_error_message = ""
except Exception:
state.excitation_error_message = "Invalid Peak. Use format (x, y) in [0,1]."
finally:
update_excitation_info_message()
@state.change("mu_pair")
def sync_mu_pair(mu_pair, **kwargs):
"""Parse normalized pair (x, y) in [0,1] for Mu and update mu_x/mu_y."""
try:
m = re.match(r"\(\s*([-+]?[0-9]*\.?[0-9]+)\s*,\s*([-+]?[0-9]*\.?[0-9]+)\s*\)", str(mu_pair))
if not m:
raise ValueError("Invalid format")
x = max(0.0, min(1.0, float(m.group(1))))
y = max(0.0, min(1.0, float(m.group(2))))
state.mu_x = x
state.mu_y = y
state.excitation_error_message = ""
except Exception:
state.excitation_error_message = "Invalid Mu. Use format (x, y) in [0,1]."
finally:
update_excitation_info_message()
def stop_simulation_handler():
"""Stop the currently running simulation"""
global stop_simulation
stop_simulation = True
state.stop_button_disabled = True
print("Stopping simulation...")
def _build_sim_timeseries_plotly(field_type: str, positions, nx: int, times, sim_data):
try:
import plotly.graph_objects as go
import numpy as _np
from matplotlib import cm as _cm
from matplotlib import colors as _mcolors
def _rgba_to_hex(rgba):
r, g, b, a = rgba
return "#%02x%02x%02x" % (int(r*255), int(g*255), int(b*255))
n_frames = int(sim_data.shape[0]) if sim_data is not None else 0
time_axis = _np.asarray(times) if times is not None else _np.arange(n_frames)
# Helper to get grid dims per field
def _dims(f):
if f == 'Ez':
return nx, nx
if f == 'Hx':
return nx, nx - 1
return nx - 1, nx # Hy
# Validate and normalize positions against a given field
def _valid_positions(f, pts):
gw, gh = _dims(f)
out = []
for (px, py) in pts:
if 0 <= px < gw and 0 <= py < gh:
out.append((int(px), int(py)))
return out
fig = go.Figure()
if not positions or sim_data is None or n_frames == 0:
fig.update_layout(
title="Time Series (Simulator)",
height=660, width=900,
margin=dict(l=50, r=30, t=50, b=50),
xaxis=dict(title="Time (s)", title_font=dict(size=22), tickfont=dict(size=16), showline=True, linewidth=1, linecolor="rgba(0,0,0,.3)", gridcolor="rgba(0,0,0,.06)", showspikes=True, spikemode='across', spikesnap='cursor'),
yaxis=dict(title="Field Amplitude", title_font=dict(size=22), tickfont=dict(size=16), showline=True, linewidth=1, linecolor="rgba(0,0,0,.3)", gridcolor="rgba(0,0,0,.06)", zeroline=True, zerolinecolor="rgba(0,0,0,.25)"),
hovermode="x unified",
legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1),
)
return fig
# Compute color normalization metric: s = (px + py) / max_sum
max_sum = max((px + py) for (px, py) in positions) if positions else 1
if max_sum <= 0:
max_sum = 1
# Field → colormap mapping
cmap_map = {
'Ez': _cm.Reds,
'Hx': _cm.Greens,
'Hy': _cm.Blues,
}
# Add traces for a single field
def _add_field_traces(f_name: str, pts):
nonlocal fig
gw, gh = _dims(f_name)
valid_pts = _valid_positions(f_name, pts)
if not valid_pts:
return 0.0, 0
max_abs_local = 0.0
num_keys = len(valid_pts)
for i, (px, py) in enumerate(valid_pts):
# Extract values depending on field
if f_name == 'Ez':
values = sim_data[:, py * gw + px]
elif f_name == 'Hx':
block = sim_data[:, 2*nx*nx : 3*nx*nx-nx].reshape(n_frames, gh, gw)
values = block[:, py, px]
else: # Hy
mask = _np.arange(1, nx * nx + 1) % nx != 0
raw_block = sim_data[:, -nx*nx:]
values = _np.array([raw_block[t, mask].reshape(nx, nx - 1)[py, px] for t in range(n_frames)])
try:
max_abs_local = max(max_abs_local, float(_np.max(_np.abs(values))))
except Exception:
pass
# Color keyed by index to ensure distinctness
if num_keys > 1:
s_index = i / (num_keys - 1)
s_light = 0.3 + 0.6 * s_index
else:
s_light = 0.6
rgba = cmap_map.get(f_name, _cm.Blues)(s_light)
color_hex = _rgba_to_hex(rgba)
dash_styles = ["solid", "dash", "dot", "dashdot"]
marker_symbols = ["circle", "square", "diamond", "triangle-up", "x"]
label = _normalized_position_label(px, py, gw, gh)
fig.add_trace(go.Scatter(
x=time_axis,
y=values,
mode='lines+markers',
name=label,
line=dict(color=color_hex, width=2.5, dash=dash_styles[i % len(dash_styles)]),
marker=dict(size=7, symbol=marker_symbols[i % len(marker_symbols)], color=color_hex, line=dict(width=0)),
hovertemplate=f"{f_name} | t=%{{x:.3f}}s<br>Value=%{{y:.6g}}<extra>{label}</extra>",
))
return max_abs_local, len(valid_pts)
max_abs = 0.0
total_traces = 0
if str(field_type) == 'All':
for f in ('Ez', 'Hx', 'Hy'):
m, n_tr = _add_field_traces(f, positions)
max_abs = max(max_abs, m)
total_traces += n_tr
else:
m, n_tr = _add_field_traces(str(field_type), positions)
max_abs = max(max_abs, m)
total_traces += n_tr
# Layout styling: doubled label fonts, refined gridlines
title_suffix = str(field_type) if str(field_type) != 'All' else 'Ez, Hx, Hy'
fig.update_layout(
title=f"Time Series (Simulator: {title_suffix})",
height=660, width=900,
margin=dict(l=50, r=30, t=50, b=50),
hovermode="x unified",
legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1, title_text=""),
paper_bgcolor="#FFFFFF",
plot_bgcolor="#FFFFFF",
)
fig.update_xaxes(
title_text="Time (s)", title_font=dict(size=22), tickfont=dict(size=16),
showgrid=True, gridcolor="rgba(95,37,159,0.08)", zeroline=False,
showline=True, linewidth=1, linecolor="rgba(0,0,0,.2)",
showspikes=True, spikemode='across', spikesnap='cursor'
)
fig.update_yaxes(
title_text="Field Amplitude", title_font=dict(size=22), tickfont=dict(size=16),
showgrid=True, gridcolor="rgba(95,37,159,0.08)", zeroline=True, zerolinecolor="rgba(0,0,0,.25)",
showline=True, linewidth=1, linecolor="rgba(0,0,0,.2)"
)
if max_abs > 0:
pad = 0.12 * max_abs
fig.update_yaxes(range=[-max_abs - pad, max_abs + pad])
return fig
except Exception as _e:
try:
import plotly.graph_objects as go
return go.Figure(layout=dict(height=660, width=900))
except Exception:
return None
def generate_plot():
global current_mesh, sim_ts_cache
if not state.simulation_has_run:
return
plotter.clear()
try:
plotter.disable_picking()
except:
pass
nx, T = int(state.nx), float(state.T)
if state.output_type == "Surface Plot":
redraw_surface_plot()
else: # Time Series -> Plotly for Simulator
try:
points_str = state.timeseries_gridpoints or ""
positions = [tuple(map(int, match)) for match in re.findall(r'\((\d+)\s*,\s*(\d+)\)', points_str)]
if not positions and (state.timeseries_points or "").strip():
raise ValueError("No valid monitor positions found. Enter (x, y) pairs in [0,1] x [0,1].")
fig = _build_sim_timeseries_plotly(state.timeseries_field, positions, nx, snapshot_times, simulation_data)
if fig is not None:
# Cache the figure for export
sim_ts_cache["fig"] = fig
sim_ts_cache["field"] = state.timeseries_field
try:
ctrl.sim_ts_update(fig)
except Exception:
pass
except Exception as e:
state.error_message = f"Plotting Error: {e}"
ctrl.view_update()
def redraw_surface_plot():
global current_mesh
plotter.clear()
field = state.surface_field
if data_frames is None or not data_frames.get(field): return
if snapshot_times is None or len(snapshot_times) == 0: return
# Find nearest snapshot index to requested time and clamp to available frames
req_t = float(state.time_val)
times = np.asarray(snapshot_times)
idx = int(np.argmin(np.abs(times - req_t)))
max_idx = len(data_frames[field]) - 1
idx = max(0, min(idx, max_idx))
z_data = data_frames[field][idx]
points = np.c_[X_grids[field].ravel(), Y_grids[field].ravel(), z_data.ravel() * z_scale]
poly = pv.PolyData(points)
mesh = poly.delaunay_2d()
mesh['scalars'] = z_data.ravel()
current_mesh = mesh
plotter.add_mesh(mesh, scalars='scalars', clim=surface_clims[field], cmap="Blues", show_scalar_bar=False, show_edges=True, edge_color='grey', line_width=0.5)
plotter.add_scalar_bar(title=f"{field} Amplitude")
try:
plotter.disable_picking()
except Exception:
pass
plotter.enable_point_picking(callback=update_value_display, show_message=False)
plotter.add_axes()
plotter.view_isometric()
try:
plotter.camera.parallel_projection = True
except Exception:
pass
ctrl.view_update()
# Helper: add a dotted unit grid (0..1) overlay in light Synopsys purple
def _add_dotted_unit_grid(plotter, ticks=(0.0, 0.25, 0.5, 0.75, 1.0), segments=48, gap_ratio=0.4, color="#AE8BD8", line_width=0.2):
try:
step = 1.0 / float(max(segments, 1))
seg_len = step * float(max(0.0, min(1.0, 1.0 - gap_ratio)))
pts = []
lines = []
# Horizontal dotted lines at given y=tick
for y in ticks:
pos = 0.0
while pos < 1.0 - 1e-9:
y0, y1 = pos, min(pos + seg_len, 1.0)
pts.extend([(0.0, y, 0.0), (1.0, y, 0.0)]) # end points along x (we'll segment via multiple x positions)
# Replace with segmented along X
pts[-2] = (pos, y, 0.0)
pts[-1] = (y1 if seg_len > 0 else pos, y, 0.0)
i0 = len(pts) - 2
lines.extend([2, i0, i0 + 1])
pos += step
# Vertical dotted lines at given x=tick
for x in ticks:
pos = 0.0
while pos < 1.0 - 1e-9:
y0, y1 = pos, min(pos + seg_len, 1.0)
pts.extend([(x, pos, 0.0), (x, y1 if seg_len > 0 else pos, 0.0)])
i0 = len(pts) - 2
lines.extend([2, i0, i0 + 1])
pos += step
if pts and lines:
poly = pv.PolyData(np.array(pts))
poly.lines = np.array(lines)
plotter.add_mesh(poly, color=color, line_width=line_width, name="dotted_unit_grid", pickable=False)
except Exception:
pass
# Scaled dotted unit grid overlay for integer-coordinate previews (Delta/Gaussian)
def _add_dotted_unit_grid_scaled(plotter, denom, ticks=(0.0, 0.25, 0.5, 0.75, 1.0), segments=48, gap_ratio=0.6, color="#AE8BD8", line_width=1.0, name="dotted_unit_grid_preview"):
"""Overlay a 0–1 dotted grid scaled to [0, denom] on the XY plane without changing mesh coordinates."""
try:
step = 1.0 / float(max(segments, 1))
seg_len = step * float(max(0.0, min(1.0, 1.0 - gap_ratio)))
# Set a z slightly below mesh to avoid z-fighting
try:
z0 = float(current_mesh.points[:, 2].min()) - 1e-6 if current_mesh is not None else 0.0
except Exception:
z0 = 0.0
pts, lines = [], []
# Vertical lines at x = t * denom
for t in ticks:
x = float(t) * float(denom)
pos = 0.0
while pos < 1.0 - 1e-9:
y0 = pos * denom
y1 = min(pos + seg_len, 1.0) * denom
pts.extend([(x, y0, z0), (x, y1, z0)])
i0 = len(pts) - 2
lines.extend([2, i0, i0 + 1])
pos += step
# Horizontal lines at y = t * denom
for t in ticks:
y = float(t) * float(denom)
pos = 0.0
while pos < 1.0 - 1e-9:
x0 = pos * denom
x1 = min(pos + seg_len, 1.0) * denom
pts.extend([(x0, y, z0), (x1, y, z0)])
i0 = len(pts) - 2
lines.extend([2, i0, i0 + 1])
pos += step
try:
plotter.remove_actor(name)
except Exception:
pass
if pts and lines:
poly = pv.PolyData(np.array(pts))
poly.lines = np.array(lines)
plotter.add_mesh(poly, color=color, line_width=line_width, name=name, pickable=False)
except Exception:
pass
# Helpers for square hole edge computation/alignment on a normalized [0,1] grid
def _nearest_gridline(val: float, nx: int) -> float:
denom = float(max(int(nx) - 1, 1))
return round(float(val) * denom) / denom
def _compute_hole_edges(nx: int, cx: float, cy: float, a: float, snap: bool = True):
"""Compute square hole edges (xL, xR, yB, yT) in [0,1].
- nx: points per direction; grid lines at k/(nx-1).
- (cx, cy): center in (0,1).
- a: edge length in (0,1].
- snap: if True, snap edges to nearest grid lines; else require exact alignment.
Returns tuple or None when invalid/out-of-bounds/misaligned.
"""
try:
nx = int(nx)
cx = float(cx)
cy = float(cy)
a = float(a)
except Exception:
return None
if not (a > 0.0):
return None
half = a / 2.0
xL, xR = cx - half, cx + half
yB, yT = cy - half, cy + half
# Must be strictly inside domain to allow removing interior cells safely
if not (0.0 < xL < xR < 1.0 and 0.0 < yB < yT < 1.0):
return None
if snap:
xL_s = _nearest_gridline(xL, nx)
xR_s = _nearest_gridline(xR, nx)
yB_s = _nearest_gridline(yB, nx)
yT_s = _nearest_gridline(yT, nx)
# Ensure non-degenerate after snapping; attempt a minimal adjustment if equal
if xL_s >= xR_s or yB_s >= yT_s:
step = 1.0 / float(max(nx - 1, 1))
if xL_s >= xR_s:
if xL_s - step > 0.0:
xL_s -= step
elif xR_s + step < 1.0:
xR_s += step
if yB_s >= yT_s:
if yB_s - step > 0.0:
yB_s -= step
elif yT_s + step < 1.0:
yT_s += step
if xL_s >= xR_s or yB_s >= yT_s:
return None
return (xL_s, xR_s, yB_s, yT_s)
else:
# Require edges to already lie on grid lines
denom = float(max(nx - 1, 1))
tol = 1e-9
def _aligned(v: float) -> bool:
return abs(v * denom - round(v * denom)) < tol
if all(_aligned(v) for v in (xL, xR, yB, yT)):
return (xL, xR, yB, yT)
return None
def _build_geometry_placeholder(message: str) -> go.Figure:
fig = go.Figure()
fig.add_annotation(
text=message,
x=0.5,
y=0.5,
showarrow=False,
font=dict(size=18, color="#5F259F"),
)
fig.update_xaxes(visible=False)
fig.update_yaxes(visible=False)
fig.update_layout(
template="plotly_white",
margin=dict(l=20, r=20, t=40, b=20),
paper_bgcolor="#ffffff",
plot_bgcolor="#ffffff",
height=460,
showlegend=False,
)
return fig
def _build_square_domain_plot(
nx: int,
title: str,
hole_edges=None,
*,
show_edges: bool = True,
dense_grid: bool = False,
) -> go.Figure:
nx = max(int(nx), 3)
grid = np.linspace(0.0, 1.0, nx)
X, Y = np.meshgrid(grid, grid, indexing="xy")
Z = np.zeros_like(X, dtype=float)
color_field = np.full_like(Z, 0.85)
if hole_edges is not None:
xL, xR, yB, yT = hole_edges
mask = (X >= xL) & (X <= xR) & (Y >= yB) & (Y <= yT)
Z = np.where(mask, np.nan, Z)
color_field = np.where(mask, np.nan, color_field)
fig = go.Figure()
fig.add_trace(
go.Surface(
x=X,
y=Y,
z=Z,
surfacecolor=np.where(np.isnan(color_field), 0.15, color_field),
colorscale=EXCITATION_SURFACE_COLORSCALE,
cmin=0.0,
cmax=1.0,
showscale=False,
opacity=0.98,
lighting=dict(ambient=0.85, diffuse=0.55, specular=0.1),
hovertemplate="x=%{x:.3f}<br>y=%{y:.3f}<extra></extra>",
)
)
if show_edges:
base_z = -0.012
grid_vals = (
np.linspace(0.0, 1.0, max(int(nx), 2))
if dense_grid
else np.asarray(DEFAULT_AXIS_TICKS)
)
line_x, line_y, line_z = [], [], []
x_min_val, x_max_val = float(grid[0]), float(grid[-1])
for val in grid_vals:
val_f = float(val)
line_x.extend([val_f, val_f, np.nan])
line_y.extend([x_min_val, x_max_val, np.nan])
line_z.extend([base_z, base_z, np.nan])
for val in grid_vals:
val_f = float(val)
line_x.extend([x_min_val, x_max_val, np.nan])
line_y.extend([val_f, val_f, np.nan])
line_z.extend([base_z, base_z, np.nan])
fig.add_trace(
go.Scatter3d(
x=line_x,
y=line_y,
z=line_z,
mode="lines",
line=dict(color="rgba(174,139,216,0.65)", width=1.6),
showlegend=False,
hoverinfo="skip",
)
)
scale_ticks = list(DEFAULT_AXIS_TICKS)
tick_text = [f"{t:.2f}" for t in scale_ticks]
tick_plane = -0.02
fig.add_trace(
go.Scatter3d(
x=scale_ticks,
y=[-0.018] * len(scale_ticks),
z=[tick_plane] * len(scale_ticks),
mode="text",
text=tick_text,
textfont=dict(color="#5F259F", size=12),
showlegend=False,
hoverinfo="skip",
)
)
fig.add_trace(
go.Scatter3d(
x=[-0.018] * len(scale_ticks),
y=scale_ticks,
z=[tick_plane] * len(scale_ticks),
mode="text",
text=tick_text,
textfont=dict(color="#5F259F", size=12),
showlegend=False,
hoverinfo="skip",
)
)
if hole_edges is not None:
xL, xR, yB, yT = hole_edges
fig.add_trace(
go.Scatter3d(
x=[xL, xR, xR, xL, xL],
y=[yB, yB, yT, yT, yB],
z=[0.0] * 5,
mode="lines",
line=dict(color="#FFFFFF", width=5),
hoverinfo="skip",
showlegend=False,
)
)
fig.update_layout(
title=title,
margin=dict(l=8, r=8, t=44, b=8),
height=620,
template="plotly_white",
scene=dict(
xaxis=dict(range=[-0.05, 1.05], visible=False, backgroundcolor="#f7f3ff"),
yaxis=dict(range=[-0.05, 1.05], visible=False, backgroundcolor="#f7f3ff"),
zaxis=dict(range=[0.1, 0.1], visible=False, backgroundcolor="#f7f3ff"),
aspectmode="cube",
camera=dict(eye=dict(x=1.25, y=1.25, z=0.85)),
),
dragmode="orbit",
uirevision="geometry_surface",
)
return fig
def _push_geometry_plot(fig: go.Figure):
try:
if hasattr(ctrl, "geometry_preview_update"):
ctrl.geometry_preview_update(fig)
except Exception:
pass
def _refresh_pyvista_view():
try:
if hasattr(ctrl, "view_update"):
ctrl.view_update()
except Exception:
pass
def _build_excitation_placeholder(message: str = "Select an excitation to preview.") -> go.Figure:
fig = go.Figure()
fig.add_annotation(text=message, x=0.5, y=0.5, showarrow=False, font=dict(size=18, color="#5F259F"))
fig.update_xaxes(visible=False)
fig.update_yaxes(visible=False)
fig.update_layout(
template="plotly_white",
margin=dict(l=20, r=20, t=40, b=20),
paper_bgcolor="#ffffff",
plot_bgcolor="#ffffff",
height=520,
showlegend=False,
)
return fig
def _build_excitation_surface_plot(
X: np.ndarray,
Y: np.ndarray,
field: np.ndarray,
title: str,
*,
show_grid_lines: bool = False,
grid_line_resolution: int | None = None,
) -> go.Figure:
max_abs = float(np.max(np.abs(field))) if field.size else 1.0
if max_abs < 1e-12:
max_abs = 1.0
height_scale = 0.1
Z = (field / max_abs) * height_scale
fig = go.Figure()
fig.add_trace(
go.Surface(
x=X,
y=Y,
z=Z,
surfacecolor=field,
colorscale=EXCITATION_SURFACE_COLORSCALE,
cmin=-max_abs,
cmax=max_abs,
showscale=False,
opacity=0.98,
lighting=dict(ambient=0.6, diffuse=0.6, specular=0.15),
hovertemplate="x=%{x:.3f}<br>y=%{y:.3f}<br>z=%{z:.3f}<extra></extra>",
)
)
x_vals = np.unique(np.round(X[0], decimals=6))
y_vals = np.unique(np.round(Y[:, 0], decimals=6))
x_min, x_max = float(np.min(x_vals)), float(np.max(x_vals))
y_min, y_max = float(np.min(y_vals)), float(np.max(y_vals))
base_z = -0.02
if show_grid_lines:
span_x = x_max - x_min if x_max != x_min else 1.0
span_y = y_max - y_min if y_max != y_min else 1.0
if grid_line_resolution is not None and grid_line_resolution >= 2:
res = int(grid_line_resolution)
norm_vals = np.linspace(0.0, 1.0, res)
else:
norm_vals = np.asarray(DEFAULT_AXIS_TICKS)
x_grid_vals = x_min + span_x * norm_vals
y_grid_vals = y_min + span_y * norm_vals
line_x, line_y, line_z = [], [], []
for val in x_grid_vals:
val_f = float(val)
line_x.extend([val_f, val_f, np.nan])
line_y.extend([y_min, y_max, np.nan])
line_z.extend([base_z, base_z, np.nan])
for val in y_grid_vals:
val_f = float(val)
line_x.extend([x_min, x_max, np.nan])
line_y.extend([val_f, val_f, np.nan])
line_z.extend([base_z, base_z, np.nan])
fig.add_trace(
go.Scatter3d(
x=line_x,
y=line_y,
z=line_z,
mode="lines",
line=dict(color="rgba(174,139,216,0.3)", width=1),
showlegend=False,
hoverinfo="skip",
)
)
tick_positions_x = x_min + span_x * np.asarray(DEFAULT_AXIS_TICKS)
tick_positions_y = y_min + span_y * np.asarray(DEFAULT_AXIS_TICKS)
tick_labels = [f"{t:.2f}" for t in DEFAULT_AXIS_TICKS]
x_offset = x_min - 0.03 * span_x
y_offset = y_min - 0.03 * span_y
tick_plane = base_z - 0.01
fig.add_trace(
go.Scatter3d(
x=tick_positions_x,
y=[y_offset] * len(tick_positions_x),
z=[tick_plane] * len(tick_positions_x),
mode="text",
text=tick_labels,
textfont=dict(color="#5F259F", size=12),
showlegend=False,
hoverinfo="skip",
)
)
fig.add_trace(
go.Scatter3d(
x=[x_offset] * len(tick_positions_y),
y=tick_positions_y,
z=[tick_plane] * len(tick_positions_y),
mode="text",
text=tick_labels,
textfont=dict(color="#5F259F", size=12),
showlegend=False,
hoverinfo="skip",
)
)
fig.add_trace(
go.Scatter3d(
x=[x_min, x_max, x_max, x_min, x_min],
y=[y_min, y_min, y_max, y_max, y_min],
z=[base_z - 0.005] * 5,
mode="lines",
line=dict(color="rgba(255,192,203,0.9)", width=2.5),
showlegend=False,
hoverinfo="skip",
)
)
pad_x = 0.05 * (x_max - x_min if x_max != x_min else 1.0)
pad_y = 0.05 * (y_max - y_min if y_max != y_min else 1.0)
fig.update_layout(
title=title,
margin=dict(l=10, r=10, t=36, b=10),
height=620,
template="plotly_white",
scene=dict(
xaxis=dict(range=[x_min - pad_x, x_max + pad_x], showgrid=False, showline=False, showticklabels=False, zeroline=False, visible=False, showbackground=False),
yaxis=dict(range=[y_min - pad_y, y_max + pad_y], showgrid=False, showline=False, showticklabels=False, zeroline=False, visible=False, showbackground=False),
zaxis=dict(range=[-0.3, 0.3], showgrid=False, showline=False, showticklabels=False, zeroline=False, visible=False, showbackground=False),
aspectmode="cube",
camera=dict(eye=dict(x=1.2, y=1.2, z=1.0)),
),
dragmode="orbit",
uirevision="excitation_surface",
)
return fig
def _push_excitation_plot(fig: go.Figure | None):
render_fig = fig or _build_excitation_placeholder()
try:
if hasattr(ctrl, "excitation_preview_update"):
ctrl.excitation_preview_update(render_fig)
except Exception:
pass
# --- Plain Square Domain Preview ---
def update_geometry_preview():
"""Render the geometry preview using Plotly for the square domain option."""
global current_mesh
if state.is_running or state.simulation_has_run:
return
plotter.clear()
current_mesh = None
dense_grid = state.nx is not None
nx = int(state.nx) if state.nx is not None else 32
fig = _build_square_domain_plot(nx, "Square Domain", dense_grid=dense_grid)
_push_geometry_plot(fig)
# --- Plain Square Domain (Hole) Preview ---
def update_geometry_hole_preview():
"""Render the geometry preview with a square metallic body using Plotly."""
global current_mesh
if state.is_running or state.simulation_has_run:
return
plotter.clear()
current_mesh = None
dense_grid = state.nx is not None
nx = int(state.nx) if state.nx is not None else 32
try:
a = float(state.hole_size_edge)
cx = float(state.hole_center_x)
cy = float(state.hole_center_y)
except Exception:
a, cx, cy = 0.2, 0.5, 0.5
mode_snap = bool(state.hole_snap)
edges = _compute_hole_edges(nx, cx, cy, a, snap=mode_snap)
fig = _build_square_domain_plot(nx, "Square Metallic Body", edges, dense_grid=dense_grid)
_push_geometry_plot(fig)
# Helper: map normalized [0,1] to nearest node index on an nx×ny grid
def _nearest_node_index(x: float, y: float, nx: int, ny: int | None = None):
ny = ny or nx
i = int(round(float(x) * (nx - 1)))
j = int(round(float(y) * (ny - 1)))
i = max(0, min(nx - 1, i))
j = max(0, min(ny - 1, j))
return i, j
_SAMPLE_PAIR_RE = re.compile(r"\(\s*([-+]?\d*\.?\d+)\s*,\s*([-+]?\d*\.?\d+)\s*\)")
def _snap_samples_to_grid(sample_str: str, nx: int):
"""Convert normalized sample positions to nearest integer grid indices.
Returns (grid_points_string, message) where message is empty when no snapping occurred."""
if not sample_str or not str(sample_str).strip():
return "", ""
matches = _SAMPLE_PAIR_RE.findall(str(sample_str))
if not matches:
return "", "Enter sample position(s) as (x, y) pairs in [0,1] x [0,1]."
nx_int = int(nx)
denom = float(max(nx_int - 1, 1))
tokens = []
info_lines = []
def _normalize_value(val: float) -> float:
threshold = 1.0 + 1e-9
if abs(val) <= threshold:
return max(0.0, min(1.0, val))
max_index = float(nx_int - 1)
return max(0.0, min(max_index, val)) / denom if denom else 0.0
for raw_x, raw_y in matches:
try:
x_val = float(raw_x)
y_val = float(raw_y)
except ValueError:
continue
x_norm = _normalize_value(x_val)
y_norm = _normalize_value(y_val)
ix, iy = _nearest_node_index(x_norm, y_norm, nx_int)
tokens.append(f"({ix}, {iy})")
snapped_x = ix / denom
snapped_y = iy / denom
changed = (
abs(x_norm - snapped_x) > 1e-9
or abs(y_norm - snapped_y) > 1e-9
)
descriptor = "adjusted" if changed else "aligned"
info_lines.append(
f"Input ({x_val:.3f}, {y_val:.3f}) {descriptor} to nearest Position ({snapped_x:.3f}, {snapped_y:.3f})."
)
grid_str = ", ".join(tokens)
message = "\n".join(info_lines)
return grid_str, message
def _update_sim_monitor_points():
sample_value = state.timeseries_points
if not sample_value or not str(sample_value).strip():
state.timeseries_gridpoints = ""
state.timeseries_point_info = ""
return
nx_val = state.nx
if nx_val is None:
state.timeseries_gridpoints = ""
state.timeseries_point_info = "Select a grid size (nx) to compute the nearest monitor positions."
return
snapped, message = _snap_samples_to_grid(sample_value, int(nx_val))
state.timeseries_gridpoints = snapped
state.timeseries_point_info = message or ""
def _update_qpu_sample_slot(slot_index: int):
suffix = "" if slot_index == 1 else f"_{slot_index}"
sample_attr = f"qpu_monitor_samples{suffix}"
grid_attr = f"qpu_monitor_gridpoints{suffix}"
info_attr = f"qpu_monitor_sample_info{suffix}"
sample_value = getattr(state, sample_attr, "")
nx_val = state.nx
if not sample_value or not str(sample_value).strip():
setattr(state, grid_attr, "")
setattr(state, info_attr, "")
return
if nx_val is None:
setattr(state, info_attr, "Select a grid size (nx) to compute the nearest grid position.")
setattr(state, grid_attr, "")
return
snapped, message = _snap_samples_to_grid(sample_value, int(nx_val))
setattr(state, grid_attr, snapped)
setattr(state, info_attr, message or "")
if state.backend_type == "QPU":
_hide_qpu_plots()
def _hide_qpu_plots():
state.qpu_ts_ready = False
state.qpu_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;"
state.qpu_ts_other_ready = False
state.qpu_other_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;"
state.qpu_plot_position_options = ["All positions"]
state.qpu_plot_position_filter = "All positions"
def update_initial_state_preview():
global current_mesh
# Don't render any preview while running
if state.is_running:
plotter.clear()
_refresh_pyvista_view()
return
# If no geometry selected, clear and stop
if not state.geometry_selection:
plotter.clear()
_push_geometry_plot(_build_geometry_placeholder("Select a geometry to preview."))
_push_excitation_plot(None)
_refresh_pyvista_view()
return
# Geometry-only previews before initial state selection
if not state.simulation_has_run and not state.dist_type:
if state.geometry_selection == "Square Domain":
update_geometry_preview()
_push_excitation_plot(None)
return
if state.geometry_selection == "Square Metallic Body":
update_geometry_hole_preview()
_push_excitation_plot(None)
return
_push_geometry_plot(_build_geometry_placeholder("Preview not available for this geometry yet."))
_push_excitation_plot(None)
return
# Plotly preview for excitation surface before any simulation run
if not state.simulation_has_run:
plotter.clear()
current_mesh = None
state.error_message = ""
preview_n = 128
nx_sel = state.nx
try:
slider_n = None
if nx_sel is not None:
slider_n = int(nx_sel)
grid_n = slider_n if slider_n is not None else preview_n
if slider_n is None:
show_grid_lines = True
grid_line_resolution = None
else:
show_grid_lines = True
grid_line_resolution = max(slider_n, 2)
grid_n = max(grid_n, 8)
if state.dist_type == "Delta":
ix, iy = _nearest_node_index(float(state.impulse_x), float(state.impulse_y), grid_n)
full_state = create_impulse_state((grid_n, grid_n), (ix, iy))
title = "Delta Excitation"
elif state.dist_type == "Gaussian":
ix, iy = _nearest_node_index(float(state.mu_x), float(state.mu_y), grid_n)
sx = max(float(state.sigma_x) * (grid_n - 1), 1e-9)
sy = max(float(state.sigma_y) * (grid_n - 1), 1e-9)
full_state = create_gaussian_state((grid_n, grid_n), (ix, iy), (sx, sy))
title = "Gaussian Excitation"
else:
_push_excitation_plot(None)
_refresh_pyvista_view()
return
initial_grid = full_state[: grid_n * grid_n].reshape(grid_n, grid_n)
denom = float(max(grid_n - 1, 1))
coords = np.arange(grid_n) / denom
X, Y = np.meshgrid(coords, coords)
fig = _build_excitation_surface_plot(
X,
Y,
initial_grid,
f"{title} (nx={grid_n})",
show_grid_lines=show_grid_lines,
grid_line_resolution=grid_line_resolution,
)
_push_excitation_plot(fig)
except ValueError as e:
state.error_message = f"Parameter Error: {e}"
_push_excitation_plot(_build_excitation_placeholder("Check excitation parameters."))
except Exception as e:
state.error_message = f"An unexpected error occurred: {e}"
_push_excitation_plot(_build_excitation_placeholder("Unable to render preview."))
finally:
_refresh_pyvista_view()
return
# Use PyVista preview only after simulations have produced data
plotter.clear()
state.error_message = ""
# Default to a high-resolution 128x128 preview grid
preview_n = 128
nx_sel = state.nx
# Show grid edges only when a mesh size is selected
show_grid_edges = nx_sel is not None
try:
grid_n = int(nx_sel) if nx_sel is not None else preview_n
if state.dist_type == "Delta":
ix, iy = _nearest_node_index(float(state.impulse_x), float(state.impulse_y), grid_n)
full_state = create_impulse_state((grid_n, grid_n), (ix, iy))
elif state.dist_type == "Gaussian":
ix, iy = _nearest_node_index(float(state.mu_x), float(state.mu_y), grid_n)
sx = max(float(state.sigma_x) * (grid_n - 1), 1e-9)
sy = max(float(state.sigma_y) * (grid_n - 1), 1e-9)
full_state = create_gaussian_state((grid_n, grid_n), (ix, iy), (sx, sy))
else:
return
# Build preview mesh using the correct grid size
initial_grid = full_state[: grid_n * grid_n].reshape(grid_n, grid_n)
denom = float(max(grid_n - 1, 1))
x, y = np.arange(grid_n) / denom, np.arange(grid_n) / denom
X, Y = np.meshgrid(x, y)
max_abs = float(np.max(np.abs(initial_grid))) if initial_grid.size else 1.0
if max_abs < 1e-12:
max_abs = 1.0
height_scale = 0.15
Z = (initial_grid / max_abs) * height_scale
mesh = pv.StructuredGrid()
mesh.points = np.c_[X.ravel(), Y.ravel(), Z.ravel()]
mesh.dimensions = (grid_n, grid_n, 1)
mesh['scalars'] = initial_grid.ravel()
current_mesh = mesh
plotter.add_mesh(
mesh,
scalars='scalars',
cmap="Blues",
show_scalar_bar=False,
show_edges=show_grid_edges,
edge_color='grey',
line_width=0.5,
)
# Mirror the geometry plotter's pink scaling grid so users can gauge coordinates
plotter.show_grid(
bounds=(0.0, 1.0, 0.0, 1.0, 0.0, 0.0),
xtitle="x (0–1)",
ytitle="y (0–1)",
ztitle=" ",
color="#AE8BD8",
)
_add_dotted_unit_grid(
plotter,
ticks=(0.0, 0.25, 0.5, 0.75, 1.0),
segments=48,
gap_ratio=0.6,
color="#AE8BD8",
line_width=1,
)
plotter.add_axes()
# No scalar bar, axes, or grid overlays in excitation preview; picking only
try:
plotter.disable_picking()
except Exception:
pass
plotter.enable_point_picking(callback=update_value_display, show_message=False)
plotter.view_isometric()
try:
plotter.camera.parallel_projection = True
except Exception:
pass
_refresh_pyvista_view()
except ValueError as e:
state.error_message = f"Parameter Error: {e}"
except Exception as e:
state.error_message = f"An unexpected error occurred: {e}"
@state.change("geometry_selection")
def handle_geometry_add(geometry_selection, **kwargs):
# Normalize unselect options to None
if geometry_selection in (None, "", "None"):
state.geometry_selection = None
update_initial_state_preview()
_apply_workflow_highlights(_determine_workflow_step())
return
if (geometry_selection == "Add"):
state.show_upload_dialog = True
state.geometry_selection = None
_apply_workflow_highlights(_determine_workflow_step())
return
# Update preview on any geometry change (e.g., show Square Domain flat mesh)
update_initial_state_preview()
_apply_workflow_highlights(_determine_workflow_step())
@state.change("uploaded_file_info")
def handle_file_upload(uploaded_file_info, **kwargs):
if uploaded_file_info:
file_name = uploaded_file_info.get("name", "unknown file")
print(f"File selected (dummy upload): {file_name}")
state.show_upload_dialog = False
state.upload_status_message = f"File '{file_name}' uploaded."
state.show_upload_status = True
def log_message(message, level="INFO"):
"""Append a message to the console log with a timestamp and level."""
timestamp = datetime.now().strftime("%H:%M:%S")
new_entry = f"[{timestamp}] [{level}] {message}\n"
state.console_logs += new_entry
# Optional: Limit log size to prevent performance issues
if len(state.console_logs) > 50000:
state.console_logs = state.console_logs[-50000:]
def log_to_console(message):
timestamp = datetime.now().strftime("%H:%M:%S")
new_line = f"[{timestamp}] {message}\n"
state.console_output = (state.console_output or "") + new_line
def update_excitation_info_message():
"""Calculates and displays the coordinate snapping message."""
if state.nx is None or state.dist_type is None:
state.excitation_info_message = ""
return
try:
nx = int(state.nx)
denom = float(max(nx - 1, 1))
if state.dist_type == "Delta":
x_in, y_in = float(state.impulse_x), float(state.impulse_y)
elif state.dist_type == "Gaussian":
x_in, y_in = float(state.mu_x), float(state.mu_y)
else:
state.excitation_info_message = ""
return
ix, iy = _nearest_node_index(x_in, y_in, nx)
x_snapped, y_snapped = ix / denom, iy / denom
changed = (
abs(x_in - x_snapped) > 1e-9
or abs(y_in - y_snapped) > 1e-9
)
descriptor = "adjusted" if changed else "aligned"
state.excitation_info_message = (
f"Input ({x_in:.3f}, {y_in:.3f}) {descriptor} to nearest Position "
f"({x_snapped:.3f}, {y_snapped:.3f})."
)
except Exception:
state.excitation_info_message = ""
@state.change("nx_slider_index")
def on_slider_index_change(nx_slider_index, **kwargs):
if nx_slider_index is None:
state.nx = None
else:
try:
state.nx = int(GRID_SIZES[int(nx_slider_index)])
except Exception:
state.nx = None
update_excitation_info_message()
@state.change("nx", "T", "dist_type", "impulse_x", "impulse_y", "mu_x", "mu_y", "sigma_x", "sigma_y", "coeff_permittivity", "coeff_permeability")
def on_input_parameter_change(**kwargs):
# Do nothing while running
if state.is_running:
return
update_excitation_info_message()
if state.backend_type == "QPU":
state.qpu_ts_ready = False
state.qpu_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;"
state.qpu_ts_other_ready = False
state.qpu_other_plot_style = "display: none; width: 900px; height: 660px; margin: 0 auto;"
changed_keys = set(kwargs.keys())
# If a simulation has already run, keep current results and only indicate re-run is needed
if state.simulation_has_run:
state.run_button_text = "Re-run!"
return
# Before a run, update the initial preview only when relevant preview params changed
preview_params = {"nx", "dist_type", "impulse_x", "impulse_y", "mu_x", "mu_y", "sigma_x", "sigma_y"}
if changed_keys & preview_params:
update_initial_state_preview()
@state.change("output_type", "timeseries_field", "timeseries_points")
def on_output_config_change(**kwargs):
_update_sim_monitor_points()
if state.simulation_has_run:
generate_plot()
@state.change("timeseries_points")
def on_timeseries_points_text_change(**kwargs):
_update_sim_monitor_points()
@state.change("surface_field")
def on_surface_field_change(surface_field, **kwargs):
if state.simulation_has_run and state.output_type == "Surface Plot":
redraw_surface_plot()
@state.change("time_val")
def on_time_change(time_val, **kwargs):
if not state.simulation_has_run or state.output_type != "Surface Plot":
return
redraw_surface_plot()
def update_value_display(point):
if current_mesh is None:
return
try:
plotter.remove_actor("value_text")
except Exception:
pass
closest_id = current_mesh.find_closest_point(point)
if closest_id == -1:
return
# Sample value and coordinates at closest vertex
value = current_mesh['scalars'][closest_id] if 'scalars' in current_mesh.array_names else 0.0
px, py, pz = current_mesh.points[closest_id]
px = float(px); py = float(py)
# Determine if current mesh is on unit square [0,1] (initial preview/geometry) or integer grid (output plots)
xmin, xmax, ymin, ymax, _, _ = current_mesh.bounds
is_unit_square = (xmax <= 1.00001 and ymax <= 1.00001)
if not state.simulation_has_run and is_unit_square:
# Disable updating inputs based on point picking
text = f"Position: ({px:.3f}, {py:.3f})\nValue: {value:.3e}"
else:
# Output configuration or integer-grid context: keep grid indices visible like app.py
nx_val = int(state.nx)
denom = max(float(nx_val - 1), 1.0)
if is_unit_square:
ix = int(round(px * denom)); iy = int(round(py * denom))
x_code = max(0.0, min(1.0, px)); y_code = max(0.0, min(1.0, py))
else:
ix = int(round(px)); iy = int(round(py))
x_code = max(0.0, min(1.0, px / denom)); y_code = max(0.0, min(1.0, py / denom))
ix = max(0, min(ix, nx_val - 1)); iy = max(0, min(iy, nx_val - 1))
if state.simulation_has_run:
time = float(state.time_val)
text = f"Index: ({ix}, {iy}) | Position: ({x_code:.3f}, {y_code:.3f})\nTime: {time:.2f}s\nValue: {value:.3e}"
else:
text = f"Index: ({ix}, {iy}) | Position: ({x_code:.3f}, {y_code:.3f})\nValue: {value:.3e}"
plotter.add_text(text, name="value_text", position="lower_left", color="black", font_size=10)
ctrl.view_update()
try:
plotter.disable_picking()
except Exception:
pass
plotter.enable_point_picking(callback=update_value_display, show_message=False)
def export_vtk():
"""Export current surface mesh to user's Downloads as .vtp and notify via snackbar."""
global current_mesh
if current_mesh is None:
state.export_status_message = "No mesh to export."
state.show_export_status = True
return
try:
field = state.surface_field or "Ez"
nx = int(state.nx)
suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"surface_{field}_nx{nx}_{suffix}.vtp"
# Write to a temporary file first
with tempfile.NamedTemporaryFile(suffix=".vtp", delete=False) as tmp:
current_mesh.save(tmp.name)
content = Path(tmp.name).read_bytes()
# Encode content as base64 for browser download
content_b64 = base64.b64encode(content).decode("ascii")
# Trigger browser download via JavaScript
_server.js_call("utils", "download", filename, f"data:application/octet-stream;base64,{content_b64}")
Path(tmp.name).unlink() # Clean up
state.export_status_message = f"Exported VTK to {filename}"
except Exception as e:
state.export_status_message = f"Export failed: {e}"
state.show_export_status = True
def export_vtk_all_frames():
"""Export a .vtp file for each time frame of the selected component into a zip file."""
global data_frames, X_grids, z_scale, snapshot_times
try:
if not state.simulation_has_run:
raise ValueError("Run a simulation before exporting all frames.")
field = state.surface_field or "Ez"
frames = data_frames.get(field)
if not frames:
raise ValueError(f"No frames available for {field}.")
if snapshot_times is None:
raise ValueError("Snapshot times are unavailable.")
nx = int(state.nx)
suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_filename = f"vtk_sequence_{field}_nx{nx}_{suffix}.zip"
import zipfile
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_zip:
temp_zip_path = tmp_zip.name
with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
times = np.asarray(snapshot_times)
for i, (z_data, t) in enumerate(zip(frames, times)):
points = np.c_[X_grids[field].ravel(), Y_grids[field].ravel(), z_data.ravel() * z_scale]
poly = pv.PolyData(points)
mesh = poly.delaunay_2d()
mesh["scalars"] = z_data.ravel()
fname = f"{field}_frame_{i:04d}_t{t:.3f}s.vtp"
with tempfile.NamedTemporaryFile(suffix=".vtp", delete=False) as tmp_vtp:
tmp_vtp_path = tmp_vtp.name
mesh.save(tmp_vtp_path)
zf.write(tmp_vtp_path, arcname=fname)
Path(tmp_vtp_path).unlink()
content = Path(temp_zip_path).read_bytes()
# Encode content as base64 for browser download
content_b64 = base64.b64encode(content).decode("ascii")
_server.js_call("utils", "download", zip_filename, f"data:application/zip;base64,{content_b64}")
Path(temp_zip_path).unlink()
state.export_status_message = f"Exported {len(frames)} frames to {zip_filename}"
except Exception as e:
state.export_status_message = f"Export failed: {e}"
finally:
state.show_export_status = True
def export_mp4():
"""Export the surface plot time slider animation to MP4 using a dedicated off-screen plotter."""
global data_frames
try:
if not state.simulation_has_run:
raise ValueError("Run a simulation before exporting MP4.")
field = state.surface_field or "Ez"
frames = data_frames.get(field)
if not frames:
raise ValueError(f"No frames available for {field}.")
if len(frames) < 2:
raise ValueError("Only one frame available; increase T or simulation steps.")
nx = int(state.nx)
suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"surface_anim_{field}_nx{nx}_{suffix}.mp4"
# Create a temporary file for the MP4
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp:
temp_path = tmp.name
# Build with a dedicated off-screen plotter at a macro-block friendly size
movie_plotter = pv.Plotter(off_screen=True, window_size=(1280, 720))
# Initial mesh from first frame
X = X_grids[field]
Y = Y_grids[field]
first = frames[0]
points = np.c_[X.ravel(), Y.ravel(), first.ravel() * z_scale]
poly = pv.PolyData(points)
mesh = poly.delaunay_2d()
mesh['scalars'] = first.ravel()
actor = movie_plotter.add_mesh(
mesh,
scalars='scalars',
clim=surface_clims[field],
cmap="RdBu",
show_scalar_bar=False,
show_edges=True,
edge_color='grey',
line_width=0.5,
)
movie_plotter.add_axes()
# Use similar camera if available, else default
try:
if hasattr(plotter, 'camera_position') and plotter.camera_position:
movie_plotter.camera_position = plotter.camera_position
else:
movie_plotter.view_isometric()
except Exception:
movie_plotter.view_isometric()
movie_plotter.open_movie(temp_path, framerate=20)
n_frames = len(frames)
for z_data in frames:
if mesh.n_points != z_data.size:
# Rebuild mesh if topology changes (unlikely here)
points = np.c_[X.ravel(), Y.ravel(), z_data.ravel() * z_scale]
poly = pv.PolyData(points)
mesh = poly.delaunay_2d()
mesh['scalars'] = z_data.ravel()
movie_plotter.clear()
actor = movie_plotter.add_mesh(
mesh,
scalars='scalars',
clim=surface_clims[field],
cmap="RdBu",
show_scalar_bar=False,
show_edges=True,
edge_color='grey',
line_width=0.5,
)
else:
mesh.points[:, 2] = z_data.ravel() * z_scale
mesh['scalars'] = z_data.ravel()
movie_plotter.render()
movie_plotter.write_frame()
movie_plotter.close()
# Read the file content and trigger download
content = Path(temp_path).read_bytes()
# Encode content as base64 for browser download
content_b64 = base64.b64encode(content).decode("ascii")
_server.js_call("utils", "download", filename, f"data:video/mp4;base64,{content_b64}")
Path(temp_path).unlink() # Clean up
state.export_status_message = f"Exported MP4 to {filename}"
except Exception as e:
state.export_status_message = f"Export failed: {e}"
finally:
state.show_export_status = True
# --- Small Plot under Meshing: Qubit requirement vs Grid Size ---
def build_qubit_plot(grid_size: int):
x_sizes = np.array([16, 32, 64, 128, 256, 512])
y_qubits = 2 * np.ceil(np.log2(x_sizes)).astype(int) + 4
current_nq = int(2 * np.ceil(np.log2(max(1, int(grid_size)))) + 4)
fig = go.Figure()
# Match app.py: x = grid size, y = total qubits
fig.add_trace(go.Scatter(x=x_sizes, y=y_qubits, mode='lines', name='Total Qubits', line=dict(color='#7A3DB5', width=3)))
fig.add_trace(go.Scatter(x=[grid_size], y=[current_nq], mode='markers', marker=dict(size=10, color='#5F259F'), name='Current Selection'))
x_min = int(x_sizes.min()); x_max = int(x_sizes.max())
y_min = int(y_qubits.min()); y_max = int(max(y_qubits.max(), current_nq))
fig.update_xaxes(range=[x_min - 8, x_max + 8], tickmode='array', tickvals=x_sizes, ticktext=[str(v) for v in x_sizes], title_text="Grid Size (nx)", gridcolor='rgba(95,37,159,0.1)', zerolinecolor='rgba(95,37,159,0.3)')
fig.update_yaxes(range=[y_min - 1, y_max + 1], dtick=1, title_text="Total Qubits (nq)", gridcolor='rgba(95,37,159,0.1)', zerolinecolor='rgba(95,37,159,0.3)')
fig.update_layout(
margin=dict(l=30, r=10, t=10, b=30),
autosize=True,
legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1),
font=dict(color='#1A1A1A'),
paper_bgcolor='#FFFFFF',
plot_bgcolor='#FFFFFF',
colorway=['#5F259F', '#7A3DB5', '#AE8BD8', '#5F259F'],
)
return fig
# --- Plotly Theme (Synopsys-aligned) ---
def _install_synopsys_plotly_theme():
base = go.layout.Template(pio.templates["plotly_white"]) # build from existing template
base.layout.update(
font=dict(
family="Inter, Segoe UI, Roboto, Helvetica, Arial, sans-serif",
size=13,
color="#1A1A1A",
),
paper_bgcolor="#FFFFFF",
plot_bgcolor="#FFFFFF",
colorway=["#5F259F", "#7A3DB5", "#AE8BD8", "#1f77b4", "#ff7f0e", "#2ca02c", "#d62728"],
hoverlabel=dict(bgcolor="#FFFFFF", bordercolor="#5F259F", font=dict(color="#1A1A1A")),
legend=dict(orientation="h", x=1, xanchor="right", y=1.02, yanchor="bottom", title_text=""),
margin=dict(l=40, r=20, t=40, b=40),
)
base.layout.xaxis.update(
showgrid=True,
gridcolor="rgba(95,37,159,0.1)",
zeroline=False,
linecolor="rgba(0,0,0,.2)",
ticks="outside",
tickformat=".2f",
)
base.layout.yaxis.update(
showgrid=True,
gridcolor="rgba(95,37,159,0.1)",
zeroline=True,
zerolinecolor="rgba(0,0,0,.25)",
linecolor="rgba(0,0,0,.2)",
ticks="outside",
tickformat=".3g",
)
pio.templates["syn_white"] = base
pio.templates.default = "syn_white"
_install_synopsys_plotly_theme()
# NEW: controller to add a QPU monitor config row (moved before UI so it's defined at layout build time)
def qpu_add_monitor_config():
try:
cfgs = list(state.qpu_monitor_configs or [])
cfgs.append(_new_monitor_cfg("Ez", "(8, 8)"))
state.qpu_monitor_configs = cfgs
except Exception:
pass
# NEW: controller to remove a QPU monitor config row by index
def qpu_remove_monitor_config(index):
try:
i = int(index)
cfgs = list(state.qpu_monitor_configs or [])
if 0 <= i < len(cfgs):
cfgs.pop(i)
if not cfgs:
cfgs = [{"id": 1, "field": "Ez", "points": "(8, 8)"}]
# ensure counter is at least the last id
try:
global qpu_cfg_id_counter
qpu_cfg_id_counter = max(qpu_cfg_id_counter, 1)
except Exception:
pass
state.qpu_monitor_configs = cfgs
except Exception:
pass
# Expose remover on controller for use with template args
ctrl.qpu_remove_monitor_config = qpu_remove_monitor_config
# NEW: controllers to update per-row Field and Gridpoints selections
def qpu_set_monitor_field(index, value):
try:
i = int(index)
v = str(value).strip() if value is not None else "Ez"
cfgs = list(state.qpu_monitor_configs or [])
if 0 <= i < len(cfgs):
cfgs[i]["field"] = v
state.qpu_monitor_configs = cfgs
except Exception:
pass
def qpu_set_monitor_points(index, value):
try:
i = int(index)
v = str(value) if value is not None else ""
cfgs = list(state.qpu_monitor_configs or [])
if 0 <= i < len(cfgs):
cfgs[i]["points"] = v
state.qpu_monitor_configs = cfgs
except Exception:
pass
ctrl.qpu_set_monitor_field = qpu_set_monitor_field
ctrl.qpu_set_monitor_points = qpu_set_monitor_points
# NEW: controller to immediately set the component filter and refresh the chart
def qpu_set_plot_filter(value):
try:
v = str(value) if value is not None else "All"
state.qpu_plot_filter = v
_refresh_qpu_plot_figures()
except Exception:
pass
ctrl.qpu_set_plot_filter = qpu_set_plot_filter
def qpu_set_plot_position_filter(value):
try:
v = str(value) if value is not None else "All positions"
state.qpu_plot_position_filter = v
_refresh_qpu_plot_figures()
except Exception:
pass
ctrl.qpu_set_plot_position_filter = qpu_set_plot_position_filter
# Handlers for primitive additional monitor slots (2..5)
def qpu_add_monitor_slot():
try:
cnt = int(state.qpu_monitor_count or 0)
if cnt < 4:
new_cnt = cnt + 1
state.qpu_monitor_count = new_cnt
slot_index = 1 + new_cnt
setattr(state, f"qpu_field_components_{slot_index}", "Ez")
setattr(state, f"qpu_monitor_samples_{slot_index}", "")
setattr(state, f"qpu_monitor_sample_info_{slot_index}", "")
setattr(state, f"qpu_monitor_gridpoints_{slot_index}", "")
_hide_qpu_plots()
except Exception:
pass
def qpu_remove_monitor_slot(slot_index):
try:
s = int(slot_index)
if s < 2 or s > 5:
return
cnt = int(state.qpu_monitor_count or 0)
if cnt <= 0:
return
last_slot = 1 + cnt
for k in range(s, last_slot):
setattr(state, f"qpu_field_components_{k}", getattr(state, f"qpu_field_components_{k+1}", "Ez"))
setattr(state, f"qpu_monitor_gridpoints_{k}", getattr(state, f"qpu_monitor_gridpoints_{k+1}", ""))
setattr(state, f"qpu_monitor_samples_{k}", getattr(state, f"qpu_monitor_samples_{k+1}", ""))
setattr(state, f"qpu_monitor_sample_info_{k}", getattr(state, f"qpu_monitor_sample_info_{k+1}", ""))
setattr(state, f"qpu_field_components_{last_slot}", "Ez")
setattr(state, f"qpu_monitor_gridpoints_{last_slot}", "")
setattr(state, f"qpu_monitor_samples_{last_slot}", "")
setattr(state, f"qpu_monitor_sample_info_{last_slot}", "")
state.qpu_monitor_count = max(0, cnt - 1)
_hide_qpu_plots()
except Exception:
pass
ctrl.qpu_add_monitor_slot = qpu_add_monitor_slot
ctrl.qpu_remove_monitor_slot = qpu_remove_monitor_slot
# NEW: helper to build Plotly figure for all components except the selected one
def _rebuild_qpu_fig_others(selected_field: str, position_filter: str = "All positions"):
"""Build a Plotly figure containing all series except the selected component.
Returns a new go.Figure or None if not applicable.
"""
try:
sel = (selected_field or "All").strip()
if sel in ("", "All"):
return None
if position_filter not in ("", "All", "All positions"):
return None
times = qpu_ts_cache.get("times") or []
series_map = qpu_ts_cache.get("series_map") or {}
keys = [k for k in series_map.keys() if str(k[0]) == sel]
if not keys:
return None
import plotly.graph_objects as go
cmap = _cmap_for_field(sel)
max_sum = max(((k[1] + k[2]) for k in keys), default=1)
if max_sum <= 0:
max_sum = 1
fig = go.Figure()
dashes = ["solid", "dash", "dot", "dashdot"]
markers = ["circle", "square", "diamond", "triangle-up", "x"]
max_abs = 0.0
sorted_keys = sorted(keys, key=lambda x: (x[1], x[2]))
num_keys = len(sorted_keys)
for i, k in enumerate(sorted_keys):
field_name, px, py = k
ys = series_map.get(k) or []
if not ys or len(ys) != len(times):
continue
try:
max_abs = max(max_abs, max((abs(float(v)) for v in ys)))
except Exception:
pass
# Color keyed by index to ensure distinctness
if num_keys > 1:
s_index = i / (num_keys - 1)
s_light = 0.3 + 0.6 * s_index
else:
s_light = 0.6
rgba = cmap(s_light)
color_hex = f"#{int(rgba[0]*255):02x}{int(rgba[1]*255):02x}{int(rgba[2]*255):02x}"
fig.add_trace(go.Scatter(
x=times, y=ys, mode='lines+markers', name=f"({px}, {py})",
line=dict(color=color_hex, width=2.5, dash=dashes[i % len(dashes)]),
marker=dict(size=7, symbol=markers[i % len(markers)], color=color_hex),
hovertemplate=f"{sel} | t=%{{x:.3f}}s<br>Value=%{{y:.6g}}<extra>({px}, {py})</extra>",
))
fig.update_layout(title=f"IBM QPU Time Series (Other components: {sel})", height=660, width=900, margin=dict(l=50, r=30, t=50, b=50), hovermode="x unified", legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1, title_text=""))
fig.update_xaxes(title_text="Time (s)", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)", zeroline=False, showline=True, linewidth=1, linecolor="rgba(0,0,0,.3)")
fig.update_yaxes(title_text="Field Value", title_font=dict(size=22), tickfont=dict(size=16), showgrid=True, gridcolor="rgba(0,0,0,.06)", zeroline=True, zerolinecolor="rgba(0,0,0,.25)", showline=True, linewidth=1, linecolor="rgba(0,0,0,.3)")
if max_abs > 0:
pad = 0.12 * max_abs
fig.update_yaxes(range=[-max_abs - pad, max_abs + pad])
return fig
except Exception:
return None
# --- UI Layout ---
def build_ui():
"""Render the EM UI inside the host layout."""
if _server is None or not state.bound or not ctrl.bound:
raise RuntimeError('Call set_server(server) before build_ui().')
with vuetify3.VContainer(fluid=True, classes="pa-0 fill-height"):
with vuetify3.VDialog(v_model=("show_upload_dialog", False), max_width="500px"):
with vuetify3.VCard():
vuetify3.VCardTitle("Upload Geometry")
with vuetify3.VCardText(classes="py-1 px-2"):
vuetify3.VFileInput(
show_size=True,
label="Select geometry file",
accept=".vtp,.vtk,.glb,.stl",
update_binary=("uploaded_file_info", 1),
)
with vuetify3.VCardActions():
vuetify3.VSpacer()
vuetify3.VBtn("Cancel", click="show_upload_dialog = false")
vuetify3.VSnackbar(
v_model=("show_upload_status", False),
children=["{{ upload_status_message }}"],
timeout=4000,
location="bottom right",
color="primary",
variant="tonal",
)
vuetify3.VSnackbar(
v_model=("show_export_status", False),
children=["{{ export_status_message }}"],
timeout=4000,
location="bottom right",
color="primary",
variant="tonal",
)
with vuetify3.VRow(no_gutters=True, classes="fill-height"):
with vuetify3.VCol(cols=5, classes="pa-1 d-flex flex-column"):
# Cell 1: Introduction
with vuetify3.VCard(classes="mb-1", style=("overview_card_style", "font-size: 0.8rem;")):
with vuetify3.VCardTitle("Overview", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;"):
pass
with vuetify3.VCardText(classes="py-1 px-2"):
vuetify3.VSelect(
label="Select a problem",
v_model=("problem_selection", None),
items=(
"problem_options",
[
"Propagation in a given medium (no bodies)",
"Scattering from a perfectly conducting body",
],
),
placeholder="Select a problem",
density="compact",
color="primary",
)
vuetify3.VDivider(classes="my-0")
vuetify3.VCardSubtitle("Governing Equations", classes="text-caption font-weight-bold mt-0", style="font-size: 0.7rem;")
vuetify3.VDivider(classes="mb-0")
vuetify3.VListItemTitle("Maxwell’s time-domain, 2D, TEz polarized.", classes="text-caption", style="font-size: 0.7rem;")
vuetify3.VCardSubtitle("Inputs", classes="text-caption font-weight-bold mt-0", style="font-size: 0.7rem;")
vuetify3.VDivider(classes="mb-0")
vuetify3.VListItemTitle("Geometry, excitation, medium, output visualization preferences.", classes="text-caption", style="font-size: 0.7rem;")
vuetify3.VCardSubtitle("Outputs", classes="text-caption font-weight-bold mt-0", style="font-size: 0.7rem;")
vuetify3.VDivider(classes="mb-0")
vuetify3.VListItemTitle("Surface plots of field components OR time evolution of field components at specified points.", classes="text-caption", style="font-size: 0.7rem;")
# Cell 2: Geometry
with vuetify3.VCard(classes="mb-1", style=("geometry_card_style", "font-size: 0.8rem;")):
with vuetify3.VCardTitle("Geometry", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;"):
pass
with vuetify3.VCardText(classes="py-1 px-2"):
vuetify3.VSelect(
label="Select",
v_model=("geometry_selection", None),
items=("geometry_options",),
placeholder="Select",
density="compact",
color="primary",
)
with vuetify3.VContainer(v_if="geometry_selection === 'Square Metallic Body'", classes="pa-0 mt-2"):
with vuetify3.VRow(dense=True):
with vuetify3.VCol():
with vuetify3.VTooltip("Square hole edge length s in domain units [0,1]. Must be ≤ 1. UI-only.", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(
v_bind="props",
v_model=("hole_size_edge", 0.2),
label="Hole Edge Length [0 - 1]",
type="number",
step=0.05,
min=0,
density="compact",
color="primary",
)
with vuetify3.VCol():
with vuetify3.VTooltip("Hole center as (x, y). Both x and y must be strictly within (0,1). Example: (0.5, 0.5). UI-only.", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(
v_bind="props",
v_model=("hole_center_pair", "(0.5, 0.5)"),
label="Hole Center (X, Y)",
density="compact",
color="primary",
)
with vuetify3.VRow(dense=True, classes="mt-1"):
with vuetify3.VCol(cols=12):
vuetify3.VSwitch(
v_model=("hole_snap", True),
label="Snap edges to nearest grid lines",
color="primary",
inset=True,
density="compact",
)
vuetify3.VAlert(
v_if="hole_error_message",
type="error",
variant="tonal",
density="compact",
children=["{{ hole_error_message }}"],
classes="mt-1",
)
# Cell 3: Excitation
with vuetify3.VCard(classes="mb-1", style=("excitation_card_style", "font-size: 0.8rem;")):
with vuetify3.VCardTitle("Excitation: Initial State", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;"):
pass
with vuetify3.VCardText(classes="py-1 px-2"):
with vuetify3.VRow(classes="d-flex align-center", dense=True, no_gutters=True):
with vuetify3.VCol(classes="flex-grow-1"):
vuetify3.VSelect(
label="Select",
v_model=("dist_type", None),
items=("dist_type_options", ["None", "Delta", "Gaussian"]),
placeholder="Select",
density="compact",
color="primary",
)
with vuetify3.VCol(cols="auto", classes="ml-1"):
with vuetify3.VTooltip("Show or hide configuration for the selected excitation", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
with vuetify3.VBtn(
icon=True,
density="compact",
variant="text",
click="excitation_config_open = !excitation_config_open",
disabled=("!dist_type", False),
v_bind="props",
):
vuetify3.VIcon("mdi-cog", color=("excitation_config_open ? 'primary' : 'grey'",))
trame_html.Span("Toggle parameter inputs for the chosen excitation")
with vuetify3.VExpandTransition():
with vuetify3.VSheet(
v_if="excitation_config_open && dist_type === 'Delta'",
classes="pa-2 mt-1 rounded-lg",
style="background-color: rgba(95,37,159,0.03); border: 1px solid rgba(95,37,159,0.15);",
):
with vuetify3.VTooltip("Impulse position (x, y) in [0,1]. Example: (0.6, 0.6).", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(
v_bind="props",
v_model=("peak_pair", "(0.5, 0.5)"),
label="Peak (x, y) in [0,1]",
density="compact",
color="primary",
)
with vuetify3.VExpandTransition():
with vuetify3.VSheet(
v_if="excitation_config_open && dist_type === 'Gaussian'",
classes="pa-2 mt-1 rounded-lg",
style="background-color: rgba(95,37,159,0.03); border: 1px solid rgba(95,37,159,0.15);",
):
with vuetify3.VRow(dense=True):
with vuetify3.VCol():
with vuetify3.VTooltip("Gaussian center μ (x, y) in [0,1]. Example: (0.5, 0.5).", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(
v_bind="props",
v_model=("mu_pair", "(0.5, 0.5)"),
label="Mu (x, y) in [0,1]",
density="compact",
color="primary",
)
# Separate Sigma inputs (normalized)
with vuetify3.VRow(dense=True, classes="mt-1"):
with vuetify3.VCol():
with vuetify3.VTooltip("Gaussian spread σx in [0,1] of domain length.", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(
v_bind="props",
v_model=("sigma_x", 0.25),
label="Sigma X (0–1)",
type="number",
step="0.01",
density="compact",
color="primary",
)
with vuetify3.VCol():
with vuetify3.VTooltip("Gaussian spread σy in [0,1] of domain length.", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(
v_bind="props",
v_model=("sigma_y", 0.15),
label="Sigma Y (0–1)",
type="number",
step="0.01",
density="compact",
color="primary",
)
vuetify3.VAlert(v_if="excitation_error_message", type="error", variant="tonal", density="compact", children=["{{ excitation_error_message }}"], classes="mt-1")
vuetify3.VAlert(
v_if="excitation_info_message",
type="info",
variant="tonal",
density="compact",
children=["{{ excitation_info_message }}"],
classes="mt-1",
style="white-space: pre-line;",
)
# Cell 4: Medium
with vuetify3.VCard(classes="mb-1", style="font-size: 0.8rem;"):
with vuetify3.VCardTitle("Material Properties (Medium)", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;"):
pass
with vuetify3.VCardText(classes="py-1 px-2"):
with vuetify3.VRow(dense=True):
with vuetify3.VCol(cols="6"):
with vuetify3.VTooltip("Relative permittivity. ε_r = ε / ε₀. Default 1.0 (free space).", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(v_bind="props", v_model=("coeff_permittivity", 1.0), label="Permittivity (εr)", type="number", step="0.1", density="compact", color="primary")
with vuetify3.VCol(cols="6"):
with vuetify3.VTooltip("Relative permeability. μ_r = μ / μ₀. Default 1.0 (non-magnetic).", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(v_bind="props", v_model=("coeff_permeability", 1.0), label="Permeability (μr)", type="number", step="0.1", density="compact", color="primary")
# New Time card between Material and Backend sections
with vuetify3.VCard(classes="mb-1", style="font-size: 0.8rem;"):
with vuetify3.VCardTitle("Time", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;"):
pass
with vuetify3.VCardText(classes="py-1 px-2"):
with vuetify3.VTooltip("Sets the total duration for the simulation to run.", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(
v_bind="props",
v_model=("T", 1.0),
label="Total Time (T)",
type="number",
step="0.1",
density="compact",
color="primary",
)
# Moved Meshing card: now under Temporal Settings and before Backends
with vuetify3.VCard(classes="mb-1", style=("meshing_card_style", "font-size: 0.8rem;")):
with vuetify3.VCardTitle("Meshing", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;"):
pass
with vuetify3.VCardText(classes="py-1 px-2"):
# Show the qubit graph only while hovering over the slider (like app.py)
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end"):
with vuetify3.Template(v_slot_activator="{ props }"):
with vuetify3.VSlider(
v_bind="props",
v_model=("nx_slider_index", None),
label="No. of points per direction:",
min=0,
max=5,
step=1,
show_ticks="always",
thumb_label="always",
density="compact",
color="primary",
):
vuetify3.Template(v_slot_thumb_label="{ modelValue }", children=["{{ modelValue === null ? 'Select' : [16, 32, 64, 128, 256, 512][modelValue] }}"])
# Hover content: enlarged Plotly graph with app.py axes (x=nx, y=nq)
with vuetify3.VSheet(classes="pa-2", elevation=6, rounded=True, style="width: 644px;"):
qubit_fig_widget = plotly_widgets.Figure(
figure=build_qubit_plot(int(state.nx or 16)),
responsive=True,
style="width: 616px; height: 364px; min-height: 364px;",
)
ctrl.qubit_plot_update = qubit_fig_widget.update
# Cell: Backends (from app.py)
with vuetify3.VCard(classes="mb-1", style=("backend_card_style", "font-size: 0.8rem;")):
with vuetify3.VCardTitle("Backends", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;"):
pass
with vuetify3.VCardText(classes="py-1 px-2"):
with vuetify3.VRow(dense=True, classes="mb-2"):
with vuetify3.VCol():
vuetify3.VAlert(
type="info",
color="primary",
variant="tonal",
density="compact",
children=[
"Selected: ",
"{{ backend_type || '—' }}",
" - ",
"{{ backend_type === 'Simulator' ? selected_simulator : (backend_type === 'QPU' ? selected_qpu : '—') }}",
],
)
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VBtn(v_bind="props", text="Choose Backend", color="primary", variant="tonal", block=True)
with vuetify3.VList(density="compact"):
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end", offset=8):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VListItem(v_bind="props", title="Simulator", prepend_icon="mdi-robot-outline", append_icon="mdi-chevron-right")
with vuetify3.VList(density="compact"):
vuetify3.VListItem(title="IBM Qiskit simulator", click="backend_type = 'Simulator'; selected_simulator = 'IBM Qiskit simulator'")
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end", offset=8):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VListItem(v_bind="props", title="QPU", prepend_icon="mdi-chip", append_icon="mdi-chevron-right")
with vuetify3.VList(density="compact"):
vuetify3.VListItem(title="IBM QPU", click="backend_type = 'QPU'; selected_qpu = 'IBM QPU'")
vuetify3.VListItem(title="IonQ QPU", click="backend_type = 'QPU'; selected_qpu = 'IonQ QPU'")
# New Card: Output Preferences (appears after backend selection)
with vuetify3.VCard(v_if="backend_type", classes="mb-0", style=("output_card_style", "font-size: 0.8rem;")):
with vuetify3.VCardTitle("Output Preferences", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;"):
pass
with vuetify3.VCardText(classes="py-1 px-2"):
# Small bold heading before Δt input
vuetify3.VCardSubtitle("Select Δt intervals for plotting output snapshots", classes="text-caption font-weight-bold mt-1", style="font-size: 0.75rem;")
with vuetify3.VTooltip("Snapshot interval (Δt). Solver runs at fixed 0.1 s; frames are saved every Δt. Values < 0.1 or not multiples of 0.1 are unsupported.", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VTextField(v_bind="props", v_model=("dt_user", 0.1), label="Δt", type="number", step="0.1", density="compact", color="primary", classes="mt-1")
vuetify3.VAlert(v_if="temporal_warning", type="warning", variant="tonal", density="compact", children=["{{ temporal_warning }}"], classes="mt-1")
# Updated QPU monitor options (IBM / IonQ)
with vuetify3.VContainer(v_if="backend_type === 'QPU'", classes="pa-0 mt-2"):
with vuetify3.VRow(dense=True, classes="mb-1 align-center"):
with vuetify3.VCol(cols=4, sm=3, md=3):
vuetify3.VSelect(
label="Field",
v_model=("qpu_field_components", "Ez"),
items=("qpu_field_options", ["All", "Ez", "Hx", "Hy"]),
density="compact",
color="primary",
hide_details=True,
style="max-width: 160px;",
)
with vuetify3.VCol(cols=8, sm=9, md=9):
vuetify3.VTextField(
label="Sample position(s) (x, y) in [0,1]",
v_model=("qpu_monitor_samples", "(0.5, 0.5)"),
density="compact",
color="primary",
hide_details=True,
style="max-width: 320px;",
)
vuetify3.VAlert(
v_if="qpu_monitor_sample_info",
type="info",
variant="tonal",
density="compact",
children=["{{ qpu_monitor_sample_info }}"],
classes="mb-1",
style="white-space: pre-line;",
)
# Run Simulation and Stop Buttons Row
with vuetify3.VRow(dense=True, classes="mb-2"):
with vuetify3.VCol(cols=9):
with vuetify3.VTooltip("Starts the quantum simulation with the specified parameters. This may take some time.", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VBtn(
v_bind="props",
text=("run_button_text", "RUN!"),
click=run_simulation_only,
color="primary",
block=True,
disabled=("is_running || run_button_text === 'Successful!' || !geometry_selection || !dist_type || !!temporal_warning || nx === null || !backend_type", False),
)
with vuetify3.VCol(cols=3):
with vuetify3.VTooltip("Stop the running simulation", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VBtn(
v_bind="props",
text="Stop",
click=stop_simulation_handler,
color="error",
block=True,
disabled=("stop_button_disabled", True),
)
vuetify3.VSpacer()
# Reset Button
with trame_html.Div(style="flex: 0 0 auto;"):
with vuetify3.VTooltip("Reset all parameters to their default values", location="bottom", color="primary"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VBtn(
v_bind="props",
text="Reset",
click=reset_to_defaults,
color="secondary",
block=True,
)
# Main graph column
with vuetify3.VCol(cols=7, classes="pa-1 d-flex flex-column"):
# Output Configuration (appears after simulation) – hidden for QPU
with vuetify3.VCard(v_if="simulation_has_run && backend_type !== 'QPU'", classes="mb-1", style="font-size: 0.8rem;"):
with vuetify3.VCardSubtitle("Output Configuration", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px; font-weight: 600; color: #1A1A1A;"):
with vuetify3.VCardText(classes="py-1 px-2", style="color: #1A1A1A;"):
with vuetify3.VRadioGroup(v_model=("output_type", "Surface Plot"), row=True, density="compact", color="primary"):
vuetify3.VRadio(label="Surface", value="Surface Plot", style="font-weight: 500;")
vuetify3.VRadio(label="Time Series", value="Time Series Plot", style="font-weight: 500;")
with vuetify3.VContainer(v_if="output_type === 'Surface Plot'", classes="pa-0"):
vuetify3.VSelect(v_model=("surface_field", "Ez"), items=("surface_field_options", ["Ez", "Hx", "Hy"]), label="Field Component", density="compact", color="primary", style="font-weight: 500;")
# Replace export format dropdown and individual buttons with a single Export menu
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VBtn(v_bind="props", text="DOWNLOAD", color="primary", variant="tonal", block=True, classes="mt-1")
with vuetify3.VList(density="compact"):
vuetify3.VListSubheader("VTK")
vuetify3.VListItem(title="Current frame (VTK)", prepend_icon="mdi-download", click=export_vtk)
vuetify3.VListItem(title="All frames (VTK sequence)", prepend_icon="mdi-download-multiple", click=export_vtk_all_frames)
vuetify3.VDivider()
vuetify3.VListItem(title="Animation (MP4)", prepend_icon="mdi-movie", click=export_mp4)
with vuetify3.VContainer(v_if="output_type === 'Time Series Plot'", classes="pa-0"):
vuetify3.VSelect(v_model=("timeseries_field", "Ez"), items=("timeseries_field_options", ["All", "Ez", "Hx", "Hy"]), label="Field Component", density="compact", color="primary", style="font-weight: 500;")
vuetify3.VTextarea(
v_model=("timeseries_points", "(0.5, 0.5)"),
label="Monitor Position(s) (x, y) in [0,1]",
hint="e.g., (0.50, 0.50) or multiple comma-separated pairs",
rows=2,
auto_grow=True,
color="primary",
style="font-weight: 500;",
)
vuetify3.VAlert(
v_if="timeseries_point_info",
type="info",
variant="tonal",
density="compact",
children=["{{ timeseries_point_info }}"],
classes="mt-1",
style="white-space: pre-line;",
)
# DOWNLOAD menu for Simulator Time Series
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end"):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VBtn(v_bind="props", text="DOWNLOAD", color="primary", variant="tonal", block=True, classes="mt-1")
with vuetify3.VList(density="compact"):
vuetify3.VListItem(title="Download CSV", prepend_icon="mdi-download", click=export_sim_timeseries_csv)
vuetify3.VListItem(title="Download PNG", prepend_icon="mdi-image", click=export_sim_timeseries_png)
vuetify3.VListItem(title="Download HTML", prepend_icon="mdi-file-html", click=export_sim_timeseries_html)
# Main plot area (PyVista) – keep visible for QPU until data replaces it
with vuetify3.VCard(
v_if="geometry_selection && (backend_type !== 'QPU' || !qpu_ts_ready)",
classes="mb-1 flex-grow-1 d-flex flex-column",
style="min-height: 0;",
):
with vuetify3.VContainer(v_if="is_running", fluid=True, classes="fill-height d-flex flex-column align-center justify-center"):
vuetify3.VProgressCircular(indeterminate=True, size=64, color="primary")
vuetify3.VCardSubtitle("Running simulation...", classes="mt-4")
with vuetify3.VContainer(
v_if="!is_running && geometry_selection && !dist_type",
fluid=True,
classes="pa-0 flex-grow-1 d-flex align-center justify-center",
style="width: 100%; min-height: 560px;",
):
geometry_preview_widget = plotly_widgets.Figure(
figure=_build_geometry_placeholder("Select a geometry to preview."),
responsive=True,
style="width: 100%; height: 100%;",
)
ctrl.geometry_preview_update = geometry_preview_widget.update
with vuetify3.VContainer(
v_if="!is_running && dist_type && !simulation_has_run",
fluid=True,
classes="pa-0 flex-grow-1 d-flex align-center justify-center",
style="width: 100%; min-height: 580px;",
):
excitation_preview_widget = plotly_widgets.Figure(
figure=_build_excitation_placeholder("Select an excitation to preview."),
responsive=True,
style="width: 100%; height: 100%;",
)
ctrl.excitation_preview_update = excitation_preview_widget.update
# Surface Plot: PyVista view
with vuetify3.VContainer(v_if="!is_running && simulation_has_run && output_type === 'Surface Plot'", fluid=True, classes="pa-0", style=("pyvista_view_style", "aspect-ratio: 1 / 1; width: 100%;")):
view = plotter_ui(plotter)
ctrl.view_update = view.update
# Time Series: Plotly figure with fixed size
with vuetify3.VContainer(v_if="!is_running && simulation_has_run && output_type === 'Time Series Plot'", fluid=True, classes="d-flex align-center justify-center pa-2", style="overflow: hidden;"):
sim_ts = plotly_widgets.Figure(
figure=go.Figure(layout=dict(width=900, height=660)),
responsive=False,
style="width: 900px; height: 660px;",
)
ctrl.sim_ts_update = sim_ts.update
# Time slider for surface plot (moved inside VCard)
with vuetify3.VContainer(v_if="simulation_has_run && output_type === 'Surface Plot' && backend_type !== 'QPU'", fluid=True, classes="pa-0 mt-2"):
vuetify3.VSlider(v_model=("time_val", 0.0), label="Time", min=0, max=("T", 10.0), step=("dt_user", 0.1), thumb_label="always", density="compact", color="primary")
# Main plot area for QPU (Plotly)
with vuetify3.VCard(v_if="geometry_selection && backend_type === 'QPU' && (is_running || qpu_ts_ready)", classes="flex-grow-1", style="min-height: 0;"):
with vuetify3.VContainer(v_if="is_running", fluid=True, classes="fill-height d-flex flex-column align-center justify-center"):
vuetify3.VProgressCircular(indeterminate=True, size=64, color="primary")
vuetify3.VCardSubtitle("Running QPU...", classes="mt-4")
# Compact toolbar + plot directly below (no extra vertical stretch)
with vuetify3.VContainer(v_if="!is_running && qpu_ts_ready", fluid=True, classes="pa-2"):
with vuetify3.VToolbar(density="compact", flat=True, color="transparent", classes="px-0"):
vuetify3.VSelect(
label="Component",
v_model=("qpu_plot_filter", "All"),
items=("qpu_plot_field_options", ["All"]),
density="compact",
color="primary",
hide_details=True,
style="max-width: 180px; margin-right: 8px;",
disabled=("!qpu_ts_ready", True),
update_modelValue=(ctrl.qpu_set_plot_filter, "[$event]")
)
vuetify3.VSelect(
label="Position",
v_model=("qpu_plot_position_filter", "All positions"),
items=("qpu_plot_position_options", ["All positions"]),
density="compact",
color="primary",
hide_details=True,
style="max-width: 200px; margin-right: 8px;",
disabled=("!qpu_ts_ready", True),
update_modelValue=(ctrl.qpu_set_plot_position_filter, "[$event]")
)
vuetify3.VSpacer()
vuetify3.VBtn(
text="Clear Selection",
color="secondary",
variant="text",
click=ctrl.on_qpu_ts_clear,
disabled=("!qpu_ts_ready", True),
)
with vuetify3.VMenu(open_on_hover=True, close_on_content_click=True, location="end", offset=8):
with vuetify3.Template(v_slot_activator="{ props }"):
vuetify3.VBtn(
v_bind="props",
text="DOWNLOAD",
color="primary",
variant="tonal",
disabled=("!qpu_ts_ready", True),
)
with vuetify3.VList(density="compact"):
vuetify3.VListItem(title="Download CSV", prepend_icon="mdi-download", click=export_qpu_timeseries_csv, disabled=("!qpu_ts_ready", True))
vuetify3.VListItem(title="Download PNG", prepend_icon="mdi-image", click=export_qpu_timeseries_png, disabled=("!qpu_ts_ready", True))
vuetify3.VListItem(title="Download HTML", prepend_icon="mdi-file-html", click=export_qpu_timeseries_html, disabled=("!qpu_ts_ready", True))
# Tiny spacer under toolbar
trame_html.Div(style="height: 6px;")
# Plotly figure directly under toolbar
qpu_ts_widget = plotly_widgets.Figure(
figure=go.Figure(layout=dict(width=900, height=660)),
responsive=False,
style=("qpu_plot_style", "display: none; width: 900px; height: 660px; margin: 0 auto;"),
click=ctrl.on_qpu_ts_click,
)
ctrl.qpu_ts_update = qpu_ts_widget.update
# Placeholder when no geometry selected
with vuetify3.VContainer(v_if="!geometry_selection", fluid=True, classes="flex-grow-1 d-flex align-center justify-center text-medium-emphasis"):
vuetify3.VCardText("Select a geometry to display the preview and results.")
# Console Window
with vuetify3.VCard(classes="mt-2", style="font-size: 0.8rem;"):
with vuetify3.VCardTitle("Status", classes="text-subtitle-1 text-primary", style="font-size: 0.9rem; padding: 6px 10px;"):
pass
with vuetify3.VCardText(classes="py-1 px-2", style="height: 150px; overflow-y: auto; background-color: #f5f5f5; font-family: monospace;"):
vuetify3.VTextarea(
v_model=("console_output", ""),
readonly=True,
auto_grow=False,
rows=6,
variant="plain",
hide_details=True,
style="font-family: monospace; width: 100%; height: 100%;"
)
# Status Window - Fixed at bottom right
with vuetify3.VCard(
v_if="status_visible",
style="position: fixed; bottom: 16px; right: 16px; z-index: 1000; min-width: 320px; max-width: 450px;",
elevation=8
):
with vuetify3.VCardTitle(classes="d-flex align-center", style="font-size: 0.95rem; padding: 8px 12px;"):
vuetify3.VIcon("mdi-information-outline", size="small", classes="mr-2")
trame_html.Span("Simulation Status")
vuetify3.VSpacer()
vuetify3.VBtn(
icon="mdi-close",
size="x-small",
variant="text",
click="status_visible = false"
)
vuetify3.VDivider()
with vuetify3.VCardText(classes="py-2 px-3"):
# Status message
vuetify3.VAlert(
type=("status_type", "info"),
variant="tonal",
density="compact",
children=["{{ status_message }}"]
)
# Progress bar (shown when simulation is running)
with vuetify3.VContainer(v_if="show_progress", classes="pa-0 mt-2"):
vuetify3.VProgressLinear(
model_value=("simulation_progress", 0),
color="primary",
height=6,
striped=True
)
trame_html.Div(
"{{ simulation_progress }}% complete",
classes="text-caption text-center mt-1",
style="font-size: 0.75rem;"
)
@state.change("nx")
def update_qubit_plot(nx, **kwargs):
try:
ctrl.qubit_plot_update(build_qubit_plot(int(nx)))
except Exception:
pass
@state.change("hole_size_edge", "hole_center_x", "hole_center_y", "geometry_selection", "hole_snap")
def validate_hole_inputs(**kwargs):
# Only validate when Square Metallic Body is selected
if state.geometry_selection != "Square Metallic Body":
state.hole_error_message = ""
return
try:
s = float(state.hole_size_edge)
cx = float(state.hole_center_x)
cy = float(state.hole_center_y)
except Exception:
state.hole_error_message = "Hole size and center must be numeric."
return
# Use selected nx, fall back to a safe default when not selected yet
try:
nx = int(state.nx or 32)
except Exception:
nx = 32
if s > 1.0:
state.hole_error_message = "Hole edge length must be <= 1."
return
if not (0.0 < cx < 1.0) or not (0.0 < cy < 1.0):
state.hole_error_message = "Hole center must be strictly within (0, 1) for both X and Y."
return
# Alignment check (strict vs snap)
mode_snap = bool(state.hole_snap)
edges = _compute_hole_edges(nx, cx, cy, s, snap=mode_snap)
if not mode_snap and edges is None:
state.hole_error_message = "Hole edges must align with grid lines; enable Snap to auto-align."
return
# Inputs valid; clear error and refresh preview (which builds the mesh and renders)
state.hole_error_message = ""
update_geometry_hole_preview()
@state.change("hole_center_pair")
def sync_hole_center_pair(hole_center_pair, **kwargs):
"""Parse bracket-format pair (x, y) from dropdown into numeric center fields."""
try:
m = re.match(r"\(\s*([-+]?[0-9]*\.?[0-9]+)\s*,\s*([-+]?[0-9]*\.?[0-9]+)\s*\)", str(hole_center_pair))
if not m:
raise ValueError("Invalid format")
state.hole_center_x = float(m.group(1))
state.hole_center_y = float(m.group(2))
state.hole_error_message = ""
except Exception:
state.hole_error_message = "Invalid hole center. Use format (x, y)."
@state.change("sigma_pair")
def sync_sigma_pair(sigma_pair, **kwargs):
"""Parse bracket-format pair (x, y) for Sigma and update sigma_x/sigma_y."""
try:
m = re.match(r"\(\s*([-+]?[0-9]*\.?[0-9]+)\s*,\s*([-+]?[0-9]*\.?[0-9]+)\s*\)", str(sigma_pair))
if not m:
raise ValueError("Invalid format")
x = max(0.0, min(1.0, float(m.group(1))))
y = max(0.0, min(1.0, float(m.group(2))))
state.sigma_x = x
state.sigma_y = y
state.excitation_error_message = ""
except Exception:
state.excitation_error_message = "Invalid Sigma. Use format (x, y) in [0,1]."
@state.change("dist_type")
def normalize_dist_type(dist_type, **kwargs):
# Allow unselecting via 'None'
if dist_type in (None, "", "None"):
state.dist_type = None
update_initial_state_preview()
_apply_workflow_highlights(_determine_workflow_step())
return
update_excitation_info_message()
_apply_workflow_highlights(_determine_workflow_step())
@state.change("qpu_monitor_samples")
def on_qpu_monitor_samples_change(**kwargs):
_update_qpu_sample_slot(1)
@state.change("qpu_monitor_samples_2")
def on_qpu_monitor_samples_2_change(**kwargs):
_update_qpu_sample_slot(2)
@state.change("qpu_monitor_samples_3")
def on_qpu_monitor_samples_3_change(**kwargs):
_update_qpu_sample_slot(3)
@state.change("qpu_monitor_samples_4")
def on_qpu_monitor_samples_4_change(**kwargs):
_update_qpu_sample_slot(4)
@state.change("qpu_monitor_samples_5")
def on_qpu_monitor_samples_5_change(**kwargs):
_update_qpu_sample_slot(5)
def _refresh_all_qpu_sample_slots():
for slot in range(1, 6):
_update_qpu_sample_slot(slot)
@state.change("nx")
def on_nx_change_refresh_qpu_samples(nx, **kwargs):
_refresh_all_qpu_sample_slots()
_update_sim_monitor_points()
_apply_workflow_highlights(_determine_workflow_step())
@state.change("dt_user")
def validate_dt_user(dt_user, **kwargs):
"""Validate snapshot Δt: must be >= 0.1 (solver dt) and a multiple of 0.1."""
try:
dt_val = float(dt_user)
except Exception:
state.temporal_warning = "Δt must be numeric. Frames are captured every Δt."
return
tol = 1e-9
if dt_val < 0.1 - tol:
state.temporal_warning = "Δt < 0.1 is unsupported (solver dt = 0.1 s)."
elif abs((dt_val / 0.1) - round(dt_val / 0.1)) > 1e-9:
state.temporal_warning = "Δt must be a multiple of 0.1 s."
else:
state.temporal_warning = ""
# Reset QPU chart visibility when switching backend
@state.change("backend_type")
def on_backend_change(backend_type, **kwargs):
if backend_type == "QPU":
_hide_qpu_plots()
_apply_workflow_highlights(_determine_workflow_step())
@state.change("selected_qpu")
def on_selected_qpu_change(selected_qpu, **kwargs):
if state.backend_type == "QPU":
_hide_qpu_plots()
# NEW: react to component filter changes by rebuilding the figure
@state.change("qpu_plot_filter")
def on_qpu_plot_filter_change(qpu_plot_filter, **kwargs):
# No-op: updates handled by controller bound to the VSelect to avoid double refresh
return
# Note: update_initial_state_preview() is called from init_state() after the server is bound