import numpy as np import math from qiskit.circuit import QuantumCircuit, QuantumRegister from qiskit.circuit.library import StatePreparation, QFTGate, RZGate from qiskit.quantum_info import Statevector import pyvista as pv def create_impulse_state(grid_dims, impulse_pos): """ Creates an initial state vector with a single delta impulse at a specified grid position. The 2D grid is flattened into a 1D vector in row-major order, and this vector is then padded to match the full simulation state space size (4x). Args: grid_dims (tuple): A tuple (width, height) defining the simulation grid dimensions. For your original code, this would be (nx, nx). impulse_pos (tuple): A tuple (x, y) for the position of the impulse. Coordinates are 0-indexed. Returns: numpy.ndarray: The full, padded initial state vector with a single 1. Raises: ValueError: If the impulse position is outside the grid dimensions. """ grid_width, grid_height = grid_dims impulse_x, impulse_y = impulse_pos # --- Input Validation --- # Ensure the requested impulse position is actually on the grid. if not (0 <= impulse_x < grid_width and 0 <= impulse_y < grid_height): raise ValueError(f"Impulse position ({impulse_x}, {impulse_y}) is outside the " f"grid dimensions ({grid_width}x{grid_height}).") # --- 1. Calculate the 1D Array Index --- # Convert the (x, y) coordinate to a single index in a flattened 1D array. # The formula for row-major order is: index = y_coord * width + x_coord flat_index = impulse_y * grid_width + impulse_x # --- 2. Create the Full, Padded State Vector --- grid_size = grid_width * grid_height total_size = 4 * grid_size # The simulation space is 4x the grid size. initial_state = np.zeros(total_size) # --- 3. Set the Delta Impulse --- initial_state[flat_index] = 1 return initial_state def create_gaussian_state(grid_dims, mu, sigma): """ Creates an initial state vector with a 2D Gaussian distribution. The state is normalized and padded to match the full simulation state space size (4x). Args: grid_dims (tuple): A tuple (width, height) defining the grid dimensions. mu (tuple): A tuple (mu_x, mu_y) for the center (mean) of the Gaussian. sigma (tuple): A tuple (sigma_x, sigma_y) for the standard deviation (spread). Returns: numpy.ndarray: The full, padded initial state vector for the Gaussian state. Raises: ValueError: If sigma values are not positive. """ grid_width, grid_height = grid_dims mu_x, mu_y = mu sigma_x, sigma_y = sigma if sigma_x <= 0 or sigma_y <= 0: raise ValueError("Sigma values (spread) must be positive.") # --- 1. Create a Coordinate Grid --- x = np.arange(0, grid_width) y = np.arange(0, grid_height) X, Y = np.meshgrid(x, y) # --- 2. Calculate the 2D Gaussian Function --- gaussian_2d = np.exp(-((X - mu_x)**2 / (2 * sigma_x**2)) - ((Y - mu_y)**2 / (2 * sigma_y**2))) # --- 3. Normalize the State Vector --- # For a valid quantum state, the L2 norm (sum of squares of amplitudes) must be 1. norm = np.linalg.norm(gaussian_2d) if norm > 0: gaussian_2d = gaussian_2d / norm # --- 4. Flatten and Pad the Vector --- gaussian_flat = gaussian_2d.flatten() grid_size = grid_width * grid_height total_size = 4 * grid_size initial_state = np.pad(gaussian_flat, (0, total_size - grid_size), mode='constant') return initial_state # --- New: Continuous-position helpers for excitation before meshing --- def _normalize_to_unit(vec: np.ndarray) -> np.ndarray: n = np.linalg.norm(vec) return vec / n if n > 0 else vec def create_impulse_state_from_pos(grid_dims, pos01, snap_to_grid=True): """ Create a delta-like initial state from continuous position pos01=(x,y) in [0,1]. Args: grid_dims: Tuple (nx, ny) defining the simulation grid dimensions. pos01: Tuple (x, y) with continuous position in [0, 1] range. snap_to_grid: If True (default), snaps the impulse to the nearest grid node for an exact delta with peak value = 1.0. If False, uses bilinear interpolation which distributes amplitude to 4 nodes. Why grid_dims? - Simulation runs on a discrete nx×ny lattice; the continuous position must be discretized onto that grid to produce the state vector fed into the solver. - grid_dims provides (nx, ny) so we can map (x,y)∈[0,1]→grid coordinates. When snap_to_grid=True (default): - The impulse is placed exactly on the nearest grid node. - Peak amplitude = 1.0 (exact delta function on the discrete grid). - This is recommended for visualization and accurate peak value display. When snap_to_grid=False: - Uses bilinear interpolation to distribute amplitude to the 4 neighboring nodes. - Peak amplitude depends on position (e.g., 0.5 when exactly between 4 nodes). - Total energy is preserved (L2 norm = 1). The preview uses create_impulse_preview_state(), which renders a smooth bump on a fixed unit-square grid independent of nx for visualization. """ grid_width, grid_height = grid_dims px, py = pos01 px = float(max(0.0, min(1.0, px))) py = float(max(0.0, min(1.0, py))) gx = px * (grid_width - 1) gy = py * (grid_height - 1) grid_size = grid_width * grid_height total_size = 4 * grid_size if snap_to_grid: # Snap to nearest grid node for exact delta with peak = 1.0 i0 = int(round(gx)) j0 = int(round(gy)) i0 = max(0, min(i0, grid_width - 1)) j0 = max(0, min(j0, grid_height - 1)) field = np.zeros(grid_size) field[j0 * grid_width + i0] = 1.0 # Peak value = 1.0 initial_state = np.zeros(total_size) initial_state[:grid_size] = field return initial_state # Bilinear interpolation mode (snap_to_grid=False) i0, j0 = int(np.floor(gx)), int(np.floor(gy)) i1, j1 = min(i0 + 1, grid_width - 1), min(j0 + 1, grid_height - 1) dx, dy = gx - i0, gy - j0 w00 = (1 - dx) * (1 - dy) w10 = dx * (1 - dy) w01 = (1 - dx) * dy w11 = dx * dy field = np.zeros(grid_size) field[j0 * grid_width + i0] += w00 field[j0 * grid_width + i1] += w10 field[j1 * grid_width + i0] += w01 field[j1 * grid_width + i1] += w11 field = _normalize_to_unit(field) initial_state = np.zeros(total_size) initial_state[:grid_size] = field return initial_state def create_gaussian_state_from_pos(grid_dims, mu01, sigma01, snap_to_grid=True): """ Create a Gaussian initial state with center mu01=(x,y) and spreads sigma01=(sx,sy) in [0,1] of the domain, then discretize to the solver grid given by grid_dims. Args: grid_dims: Tuple (nx, ny) defining the simulation grid dimensions. mu01: Tuple (x, y) with Gaussian center in [0, 1]. sigma01: Tuple (sx, sy) with Gaussian std dev as fraction of domain in [0, 1]. snap_to_grid: If True (default), snap the Gaussian center to the nearest grid node before discretization. This makes the simulator behavior consistent with the "nearest-node" semantics often used elsewhere in the app. Why grid_dims? - The quantum solver expects a vector aligned to the chosen nx×ny simulation grid. We convert normalized μ and σ (fractions of the domain) into grid units using (nx-1) and (ny-1). This step is necessary for the simulation, not for the preview. For preview-only rendering, use create_impulse_preview_state() to keep the visuals continuous and independent of nx. """ grid_width, grid_height = grid_dims mu_x01, mu_y01 = mu01 sig_x01, sig_y01 = sigma01 mu_x01 = float(max(0.0, min(1.0, mu_x01))) mu_y01 = float(max(0.0, min(1.0, mu_y01))) sig_x01 = float(sig_x01) sig_y01 = float(sig_y01) if sig_x01 <= 0 or sig_y01 <= 0: raise ValueError("Sigma values (spread) must be positive.") mu_x = mu_x01 * (grid_width - 1) mu_y = mu_y01 * (grid_height - 1) if snap_to_grid: mu_x = float(int(round(mu_x))) mu_y = float(int(round(mu_y))) mu_x = float(max(0, min(int(mu_x), grid_width - 1))) mu_y = float(max(0, min(int(mu_y), grid_height - 1))) sigma_x = sig_x01 * (grid_width - 1) sigma_y = sig_y01 * (grid_height - 1) x = np.arange(0, grid_width) y = np.arange(0, grid_height) X, Y = np.meshgrid(x, y) gaussian_2d = np.exp(-((X - mu_x) ** 2) / (2 * sigma_x ** 2) - ((Y - mu_y) ** 2) / (2 * sigma_y ** 2)) field = _normalize_to_unit(gaussian_2d.ravel()) grid_size = grid_width * grid_height total_size = 4 * grid_size initial_state = np.zeros(total_size) initial_state[:grid_size] = field return initial_state # --- Simulation Code (from previous context) --- def Wj_block(j, n, ctrl_state, theta, lam, name='Wj_block', xgate=False): qc = QuantumCircuit(n + j, name=name) if j > 1: qc.cx(n + j - 1, range(n, n + j - 1)) if lam != 0: qc.p(lam, n + j - 1) qc.h(n + j - 1) if xgate and j > 1: if isinstance(xgate, (list, tuple)): for idx, flag in enumerate(xgate): if flag: qc.x(n + idx) elif xgate is True: qc.x(range(n, n + j - 1)) if j > 1: mcrz = RZGate(theta).control(len(ctrl_state) + j - 1, ctrl_state="1" * (j - 1) + ctrl_state) qc.append(mcrz, range(0, n + j)) else: mcrz = RZGate(theta).control(len(ctrl_state), ctrl_state=ctrl_state) qc.append(mcrz, range(0, n + j)) if xgate and j > 1: if isinstance(xgate, (list, tuple)): for idx, flag in enumerate(xgate): if flag: qc.x(n + idx) elif xgate is True: qc.x(range(n, n + j - 1)) qc.h(n + j - 1) if lam != 0: qc.p(-lam, n + j - 1) if j > 1: qc.cx(n + j - 1, range(n, n + j - 1)) return qc.to_gate(label=name) def V1(nx, dt): n = int(np.ceil(np.log2(nx))) derivatives, blocks = QuantumRegister(2 * n), QuantumRegister(2) qc = QuantumCircuit(derivatives, blocks) qc.append(Wj_block(2, n, "0" * n, -dt, 0, xgate=True), list(derivatives[0:n]) + list(blocks[:])) qc.append(Wj_block(3, n - 1, "1" * (n - 1), dt, 0, xgate=[0, 1]), list(derivatives[1:n]) + [derivatives[0]] + list(blocks[:])) qc.append(Wj_block(1, n + 1, "0" * (n + 1), dt, 0, xgate=True), list(derivatives[n:2 * n]) + list(blocks[:])) qc.append(Wj_block(2, n, "0" + "1" * (n - 1), -dt, 0, xgate=False), list(derivatives[n + 1:2 * n]) + [blocks[0]] + [derivatives[n]] + [blocks[1]]) return qc def V2(nx, dt): n = int(np.ceil(np.log2(nx))) derivatives, blocks = QuantumRegister(2 * n), QuantumRegister(2) qc = QuantumCircuit(derivatives, blocks) qc.append(Wj_block(2, 0, "", -2 * dt, -np.pi / 2, xgate=True), blocks[:]) for j in range(1, n + 1): qc.append(Wj_block(2 + j, 0, "", 2 * dt, -np.pi / 2, xgate=[1] * (j - 1) + [0, 1]), list(derivatives[0:j]) + list(blocks[:])) qc.append(Wj_block(2, n, "0" * n, -dt, -np.pi / 2, xgate=True), list(derivatives[0:n]) + list(blocks[:])) qc.append(Wj_block(2, n, "1" * n, 2 * dt, -np.pi / 2, xgate=True), list(derivatives[0:n]) + list(blocks[:])) qc.append(Wj_block(3, n - 1, "1" * (n - 1), dt, -np.pi / 2, xgate=[0, 1]), list(derivatives[1:n]) + [derivatives[0]] + list(blocks[:])) qc.append(Wj_block(1, 1, "0", 2 * dt, -np.pi / 2, xgate=False), blocks[:]) for j in range(1, n + 1): qc.append(Wj_block(1 + j, 1, "0", -2 * dt, -np.pi / 2, xgate=[1] * (j - 1)), [blocks[0]] + list(derivatives[n:n + j]) + [blocks[1]]) qc.append(Wj_block(1, n + 1, "0" * (n + 1), dt, -np.pi / 2, xgate=False), list(derivatives[n:2 * n]) + list(blocks[:])) qc.append(Wj_block(1, n + 1, "0" + "1" * n, -2 * dt, -np.pi / 2, xgate=False), list(derivatives[n:2 * n]) + list(blocks[:])) qc.append(Wj_block(2, n, "0" + "1" * (n - 1), -dt, -np.pi / 2, xgate=False), list(derivatives[n + 1:2 * n]) + [blocks[0]] + [derivatives[n]] + [blocks[1]]) return qc def run_sim(nx, na, R, initial_state, T, snapshot_dt=None, stop_check=None, progress_callback=None, print_callback=None): """ Runs the quantum simulation for electromagnetic scattering with fixed dt=0.1. Captures frames only at user-defined snapshot times: [0, Δt, 2Δt, ..., ≤ T_eff], always including t=0 and the final solver-aligned T (T_eff = floor(T/dt)*dt). Returns: frames (np.ndarray), snapshot_times (np.ndarray) """ def _log(msg): if print_callback: print_callback(msg) else: print(msg) dt = 0.1 # Validate total time and compute solver-aligned end time try: T_val = float(T) except Exception: return np.array([]), np.array([]) if T_val <= 0: return np.array([]), np.array([]) steps = int(np.floor(T_val / dt)) if steps <= 0: return np.array([]), np.array([]) T_eff = steps * dt # Determine snapshot Δt on solver grid tol = 1e-12 if snapshot_dt is None: snapshot_dt_val = dt else: try: snapshot_dt_val = float(snapshot_dt) except Exception: snapshot_dt_val = dt if snapshot_dt_val < dt - tol: snapshot_dt_val = dt k = max(1, int(round(snapshot_dt_val / dt))) snapshot_dt_eff = k * dt # Build requested snapshot times on solver grid target_times = [0.0] t = 0.0 while t + snapshot_dt_eff <= T_eff + tol: t = round(t + snapshot_dt_eff, 12) if t <= T_eff + tol: target_times.append(min(t, T_eff)) if abs(target_times[-1] - T_eff) > tol: target_times.append(T_eff) # Setup circuit nq = int(np.ceil(np.log2(nx))) dp = 2 * R * np.pi / 2 ** na p = np.arange(-R * np.pi, R * np.pi, step=dp) fp = np.exp(-np.abs(p)) system, ancilla = QuantumRegister(2 * nq + 2), QuantumRegister(na) qc = QuantumCircuit(system, ancilla) qc.append(StatePreparation(initial_state), system) qc.append(StatePreparation(fp / np.linalg.norm(fp)), ancilla) expA1 = V1(nx, dt).to_gate() expA2 = V2(nx, dt) frames = [] # Capture initial frame at t=0 sv0 = np.real(Statevector(qc)).reshape(2 ** na, 2 ** (2 * nq + 2)) frames.append(sv0[2 ** (na - 1)]) next_idx = 1 # next target_times index to capture _log(f"Starting simulation: T={T_eff:.2f}s, steps={steps}, snapshot_dt={snapshot_dt_eff:.2f}s") for i in range(steps): if stop_check and stop_check(): _log(f"Simulation interrupted at step {i}/{steps}") break # One solver step qc.append(QFTGate(na), ancilla) qc.x(ancilla[-1]) for j in range(na - 1): qc.append(expA1.control().repeat(2 ** j), [ancilla[j]] + system[:]) qc.append(expA1.inverse().control(ctrl_state="0").repeat(2 ** (na - 1)), [ancilla[na - 1]] + system[:]) qc.append(expA2, system[:]) qc.x(ancilla[-1]) qc.append(QFTGate(na).inverse(), ancilla) current_time = (i + 1) * dt if next_idx < len(target_times) and abs(current_time - target_times[next_idx]) <= tol: u = np.real(Statevector(qc)).reshape(2 ** na, 2 ** (2 * nq + 2)) frames.append(u[2 ** (na - 1)]) next_idx += 1 if progress_callback: try: progress = ((i + 1) / steps) * 100 progress_callback(progress) except Exception: pass if progress_callback: try: progress_callback(100.0) except Exception: pass _log("Simulation completed.") # Ensure snapshot_times align with number of captured frames (covers early stop) frames_arr = np.asarray(frames) times_arr = np.asarray(target_times[: len(frames_arr)]) return frames_arr, times_arr def create_impulse_preview_state(preview_n: int, pos01, sigma01: float = 0.02): """ Smooth delta-like preview on a unit square using a narrow Gaussian (sigma in [0,1]). Preview-only helper, independent of simulation grid size (nx). Use this for the Excitation preview; use the *_from_pos() variants for the actual simulation. """ try: sx = float(sigma01) if sigma01 and sigma01 > 0 else 0.02 except Exception: sx = 0.02 return create_gaussian_state_from_pos((int(preview_n), int(preview_n)), (float(pos01[0]), float(pos01[1])), (sx, sx)) ##### Statevector Estimator Simulation Code Below ##### from .base_functions import * def create_time_frames(total_time, snapshot_interval): dt = 0.1 tol = 1e-9 try: T_val = float(total_time) except (ValueError, TypeError): return [] if T_val <= 0: return [] steps = int(np.floor(T_val / dt)) if steps <= 0: return [0.0] T_eff = steps * dt try: snapshot_dt_val = float(snapshot_interval) except (ValueError, TypeError): snapshot_dt_val = dt if snapshot_dt_val < dt: snapshot_dt_val = dt k = max(1, int(round(snapshot_dt_val / dt))) snapshot_dt_eff = k * dt times = np.arange(0, T_eff + tol, snapshot_dt_eff) if abs(times[-1] - T_eff) > tol: times = np.append(times, T_eff) times = np.round(times, 12) unique_times = [] for t in times: if not unique_times or abs(t - unique_times[-1]) > tol: unique_times.append(float(t)) return unique_times def run_sve(field, x, y, T, snapshot_time, nx, initial_state, impulse_pos, progress_callback=None, print_callback=None): """Statevector Estimator for time-series field values. Supports both single-point and multi-point modes. - Single-point (backward compatible): x, y are integers; returns list[float]. - Multi-point: x is a list/tuple of (ix, iy) integer pairs and y is None; returns dict[(ix,iy) -> list[float]]. """ def _log(msg): if print_callback: print_callback(msg) else: print(msg) na = 1 dt = 0.1 R = 4 nq = int(np.ceil(np.log2(nx))) # Normalize monitor points input if isinstance(x, (list, tuple)) and y is None: points = [tuple(map(int, pt)) for pt in x] multi = True else: points = [(int(x), int(y))] multi = False xref, yref = impulse_pos offset = 0 grid_dims = (nx, nx) initial_state = create_impulse_state(grid_dims, impulse_pos) dp = 2 * R * np.pi / 2**na p = np.arange(- R * np.pi, R * np.pi, step=dp) fp = np.exp(-np.abs(p)) norm = np.linalg.norm(fp) time_frames = create_time_frames(T, snapshot_time) total_frames = len(time_frames) _log(f"Starting QPU simulation: T={T}s, frames={total_frames}, points={len(points)}") # Prepare outputs if multi: series_by_point = { (px, py): [] for (px, py) in points } else: series_single = [] for idx, time in enumerate(time_frames): steps = int(math.ceil(time / dt)) # Reference Ez field at impulse location for sign Eref = Eref_value(nx, nq, R, dt, na, steps, xref, yref, field_ref='Ez') for (px, py) in points: circ_magnitude = circ_for_magnitude(field, px, py, nx, na, R, dt, initial_state, steps) magnitude = get_absolute_field_value(circ_magnitude, nq, na, offset, norm) if field == 'Ez' and px == xref and py == yref: Field_value = -magnitude if Eref < 0 else magnitude else: circsum, circdiff = circuits_for_sign(field, px, py, nx, na, dt, R, initial_state, steps, xref, yref, field_ref='Ez') sign = get_relative_sign(circsum, circdiff, nq, na) if (sign == 'same' and Eref > 0) or (sign == 'different' and Eref < 0): Field_value = magnitude else: Field_value = -magnitude if multi: series_by_point[(px, py)].append(Field_value) else: series_single.append(Field_value) # Calculate and report progress pct = (idx + 1) / total_frames * 100 if progress_callback: progress_callback(pct) _log(f"SVE Progress: {int(pct)}% (frame {idx + 1}/{total_frames})") _log("Statevector Estimator simulation completed.") return series_by_point if multi else series_single