quantum / utils /delta_impulse_generator.py
harishaseebat92
EM SVE Log progress percentage
e2e1a8c
import numpy as np
import math
from qiskit.circuit import QuantumCircuit, QuantumRegister
from qiskit.circuit.library import StatePreparation, QFTGate, RZGate
from qiskit.quantum_info import Statevector
import pyvista as pv
def create_impulse_state(grid_dims, impulse_pos):
"""
Creates an initial state vector with a single delta impulse at a specified grid position.
The 2D grid is flattened into a 1D vector in row-major order, and this
vector is then padded to match the full simulation state space size (4x).
Args:
grid_dims (tuple): A tuple (width, height) defining the simulation grid dimensions.
For your original code, this would be (nx, nx).
impulse_pos (tuple): A tuple (x, y) for the position of the impulse.
Coordinates are 0-indexed.
Returns:
numpy.ndarray: The full, padded initial state vector with a single 1.
Raises:
ValueError: If the impulse position is outside the grid dimensions.
"""
grid_width, grid_height = grid_dims
impulse_x, impulse_y = impulse_pos
# --- Input Validation ---
# Ensure the requested impulse position is actually on the grid.
if not (0 <= impulse_x < grid_width and 0 <= impulse_y < grid_height):
raise ValueError(f"Impulse position ({impulse_x}, {impulse_y}) is outside the "
f"grid dimensions ({grid_width}x{grid_height}).")
# --- 1. Calculate the 1D Array Index ---
# Convert the (x, y) coordinate to a single index in a flattened 1D array.
# The formula for row-major order is: index = y_coord * width + x_coord
flat_index = impulse_y * grid_width + impulse_x
# --- 2. Create the Full, Padded State Vector ---
grid_size = grid_width * grid_height
total_size = 4 * grid_size # The simulation space is 4x the grid size.
initial_state = np.zeros(total_size)
# --- 3. Set the Delta Impulse ---
initial_state[flat_index] = 1
return initial_state
def create_gaussian_state(grid_dims, mu, sigma):
"""
Creates an initial state vector with a 2D Gaussian distribution.
The state is normalized and padded to match the full simulation state space size (4x).
Args:
grid_dims (tuple): A tuple (width, height) defining the grid dimensions.
mu (tuple): A tuple (mu_x, mu_y) for the center (mean) of the Gaussian.
sigma (tuple): A tuple (sigma_x, sigma_y) for the standard deviation (spread).
Returns:
numpy.ndarray: The full, padded initial state vector for the Gaussian state.
Raises:
ValueError: If sigma values are not positive.
"""
grid_width, grid_height = grid_dims
mu_x, mu_y = mu
sigma_x, sigma_y = sigma
if sigma_x <= 0 or sigma_y <= 0:
raise ValueError("Sigma values (spread) must be positive.")
# --- 1. Create a Coordinate Grid ---
x = np.arange(0, grid_width)
y = np.arange(0, grid_height)
X, Y = np.meshgrid(x, y)
# --- 2. Calculate the 2D Gaussian Function ---
gaussian_2d = np.exp(-((X - mu_x)**2 / (2 * sigma_x**2)) -
((Y - mu_y)**2 / (2 * sigma_y**2)))
# --- 3. Normalize the State Vector ---
# For a valid quantum state, the L2 norm (sum of squares of amplitudes) must be 1.
norm = np.linalg.norm(gaussian_2d)
if norm > 0:
gaussian_2d = gaussian_2d / norm
# --- 4. Flatten and Pad the Vector ---
gaussian_flat = gaussian_2d.flatten()
grid_size = grid_width * grid_height
total_size = 4 * grid_size
initial_state = np.pad(gaussian_flat, (0, total_size - grid_size), mode='constant')
return initial_state
# --- New: Continuous-position helpers for excitation before meshing ---
def _normalize_to_unit(vec: np.ndarray) -> np.ndarray:
n = np.linalg.norm(vec)
return vec / n if n > 0 else vec
def create_impulse_state_from_pos(grid_dims, pos01, snap_to_grid=True):
"""
Create a delta-like initial state from continuous position pos01=(x,y) in [0,1].
Args:
grid_dims: Tuple (nx, ny) defining the simulation grid dimensions.
pos01: Tuple (x, y) with continuous position in [0, 1] range.
snap_to_grid: If True (default), snaps the impulse to the nearest grid node
for an exact delta with peak value = 1.0. If False, uses
bilinear interpolation which distributes amplitude to 4 nodes.
Why grid_dims?
- Simulation runs on a discrete nx×ny lattice; the continuous position must be
discretized onto that grid to produce the state vector fed into the solver.
- grid_dims provides (nx, ny) so we can map (x,y)∈[0,1]→grid coordinates.
When snap_to_grid=True (default):
- The impulse is placed exactly on the nearest grid node.
- Peak amplitude = 1.0 (exact delta function on the discrete grid).
- This is recommended for visualization and accurate peak value display.
When snap_to_grid=False:
- Uses bilinear interpolation to distribute amplitude to the 4 neighboring nodes.
- Peak amplitude depends on position (e.g., 0.5 when exactly between 4 nodes).
- Total energy is preserved (L2 norm = 1).
The preview uses create_impulse_preview_state(), which renders a smooth bump on a
fixed unit-square grid independent of nx for visualization.
"""
grid_width, grid_height = grid_dims
px, py = pos01
px = float(max(0.0, min(1.0, px)))
py = float(max(0.0, min(1.0, py)))
gx = px * (grid_width - 1)
gy = py * (grid_height - 1)
grid_size = grid_width * grid_height
total_size = 4 * grid_size
if snap_to_grid:
# Snap to nearest grid node for exact delta with peak = 1.0
i0 = int(round(gx))
j0 = int(round(gy))
i0 = max(0, min(i0, grid_width - 1))
j0 = max(0, min(j0, grid_height - 1))
field = np.zeros(grid_size)
field[j0 * grid_width + i0] = 1.0 # Peak value = 1.0
initial_state = np.zeros(total_size)
initial_state[:grid_size] = field
return initial_state
# Bilinear interpolation mode (snap_to_grid=False)
i0, j0 = int(np.floor(gx)), int(np.floor(gy))
i1, j1 = min(i0 + 1, grid_width - 1), min(j0 + 1, grid_height - 1)
dx, dy = gx - i0, gy - j0
w00 = (1 - dx) * (1 - dy)
w10 = dx * (1 - dy)
w01 = (1 - dx) * dy
w11 = dx * dy
field = np.zeros(grid_size)
field[j0 * grid_width + i0] += w00
field[j0 * grid_width + i1] += w10
field[j1 * grid_width + i0] += w01
field[j1 * grid_width + i1] += w11
field = _normalize_to_unit(field)
initial_state = np.zeros(total_size)
initial_state[:grid_size] = field
return initial_state
def create_gaussian_state_from_pos(grid_dims, mu01, sigma01, snap_to_grid=True):
"""
Create a Gaussian initial state with center mu01=(x,y) and spreads sigma01=(sx,sy)
in [0,1] of the domain, then discretize to the solver grid given by grid_dims.
Args:
grid_dims: Tuple (nx, ny) defining the simulation grid dimensions.
mu01: Tuple (x, y) with Gaussian center in [0, 1].
sigma01: Tuple (sx, sy) with Gaussian std dev as fraction of domain in [0, 1].
snap_to_grid: If True (default), snap the Gaussian center to the nearest grid
node before discretization. This makes the simulator behavior consistent
with the "nearest-node" semantics often used elsewhere in the app.
Why grid_dims?
- The quantum solver expects a vector aligned to the chosen nx×ny simulation grid.
We convert normalized μ and σ (fractions of the domain) into grid units using
(nx-1) and (ny-1). This step is necessary for the simulation, not for the preview.
For preview-only rendering, use create_impulse_preview_state() to keep the visuals
continuous and independent of nx.
"""
grid_width, grid_height = grid_dims
mu_x01, mu_y01 = mu01
sig_x01, sig_y01 = sigma01
mu_x01 = float(max(0.0, min(1.0, mu_x01)))
mu_y01 = float(max(0.0, min(1.0, mu_y01)))
sig_x01 = float(sig_x01)
sig_y01 = float(sig_y01)
if sig_x01 <= 0 or sig_y01 <= 0:
raise ValueError("Sigma values (spread) must be positive.")
mu_x = mu_x01 * (grid_width - 1)
mu_y = mu_y01 * (grid_height - 1)
if snap_to_grid:
mu_x = float(int(round(mu_x)))
mu_y = float(int(round(mu_y)))
mu_x = float(max(0, min(int(mu_x), grid_width - 1)))
mu_y = float(max(0, min(int(mu_y), grid_height - 1)))
sigma_x = sig_x01 * (grid_width - 1)
sigma_y = sig_y01 * (grid_height - 1)
x = np.arange(0, grid_width)
y = np.arange(0, grid_height)
X, Y = np.meshgrid(x, y)
gaussian_2d = np.exp(-((X - mu_x) ** 2) / (2 * sigma_x ** 2) - ((Y - mu_y) ** 2) / (2 * sigma_y ** 2))
field = _normalize_to_unit(gaussian_2d.ravel())
grid_size = grid_width * grid_height
total_size = 4 * grid_size
initial_state = np.zeros(total_size)
initial_state[:grid_size] = field
return initial_state
# --- Simulation Code (from previous context) ---
def Wj_block(j, n, ctrl_state, theta, lam, name='Wj_block', xgate=False):
qc = QuantumCircuit(n + j, name=name)
if j > 1: qc.cx(n + j - 1, range(n, n + j - 1))
if lam != 0: qc.p(lam, n + j - 1)
qc.h(n + j - 1)
if xgate and j > 1:
if isinstance(xgate, (list, tuple)):
for idx, flag in enumerate(xgate):
if flag: qc.x(n + idx)
elif xgate is True: qc.x(range(n, n + j - 1))
if j > 1:
mcrz = RZGate(theta).control(len(ctrl_state) + j - 1, ctrl_state="1" * (j - 1) + ctrl_state)
qc.append(mcrz, range(0, n + j))
else:
mcrz = RZGate(theta).control(len(ctrl_state), ctrl_state=ctrl_state)
qc.append(mcrz, range(0, n + j))
if xgate and j > 1:
if isinstance(xgate, (list, tuple)):
for idx, flag in enumerate(xgate):
if flag: qc.x(n + idx)
elif xgate is True: qc.x(range(n, n + j - 1))
qc.h(n + j - 1)
if lam != 0: qc.p(-lam, n + j - 1)
if j > 1: qc.cx(n + j - 1, range(n, n + j - 1))
return qc.to_gate(label=name)
def V1(nx, dt):
n = int(np.ceil(np.log2(nx)))
derivatives, blocks = QuantumRegister(2 * n), QuantumRegister(2)
qc = QuantumCircuit(derivatives, blocks)
qc.append(Wj_block(2, n, "0" * n, -dt, 0, xgate=True), list(derivatives[0:n]) + list(blocks[:]))
qc.append(Wj_block(3, n - 1, "1" * (n - 1), dt, 0, xgate=[0, 1]), list(derivatives[1:n]) + [derivatives[0]] + list(blocks[:]))
qc.append(Wj_block(1, n + 1, "0" * (n + 1), dt, 0, xgate=True), list(derivatives[n:2 * n]) + list(blocks[:]))
qc.append(Wj_block(2, n, "0" + "1" * (n - 1), -dt, 0, xgate=False), list(derivatives[n + 1:2 * n]) + [blocks[0]] + [derivatives[n]] + [blocks[1]])
return qc
def V2(nx, dt):
n = int(np.ceil(np.log2(nx)))
derivatives, blocks = QuantumRegister(2 * n), QuantumRegister(2)
qc = QuantumCircuit(derivatives, blocks)
qc.append(Wj_block(2, 0, "", -2 * dt, -np.pi / 2, xgate=True), blocks[:])
for j in range(1, n + 1): qc.append(Wj_block(2 + j, 0, "", 2 * dt, -np.pi / 2, xgate=[1] * (j - 1) + [0, 1]), list(derivatives[0:j]) + list(blocks[:]))
qc.append(Wj_block(2, n, "0" * n, -dt, -np.pi / 2, xgate=True), list(derivatives[0:n]) + list(blocks[:]))
qc.append(Wj_block(2, n, "1" * n, 2 * dt, -np.pi / 2, xgate=True), list(derivatives[0:n]) + list(blocks[:]))
qc.append(Wj_block(3, n - 1, "1" * (n - 1), dt, -np.pi / 2, xgate=[0, 1]), list(derivatives[1:n]) + [derivatives[0]] + list(blocks[:]))
qc.append(Wj_block(1, 1, "0", 2 * dt, -np.pi / 2, xgate=False), blocks[:])
for j in range(1, n + 1): qc.append(Wj_block(1 + j, 1, "0", -2 * dt, -np.pi / 2, xgate=[1] * (j - 1)), [blocks[0]] + list(derivatives[n:n + j]) + [blocks[1]])
qc.append(Wj_block(1, n + 1, "0" * (n + 1), dt, -np.pi / 2, xgate=False), list(derivatives[n:2 * n]) + list(blocks[:]))
qc.append(Wj_block(1, n + 1, "0" + "1" * n, -2 * dt, -np.pi / 2, xgate=False), list(derivatives[n:2 * n]) + list(blocks[:]))
qc.append(Wj_block(2, n, "0" + "1" * (n - 1), -dt, -np.pi / 2, xgate=False), list(derivatives[n + 1:2 * n]) + [blocks[0]] + [derivatives[n]] + [blocks[1]])
return qc
def run_sim(nx, na, R, initial_state, T, snapshot_dt=None, stop_check=None, progress_callback=None, print_callback=None):
"""
Runs the quantum simulation for electromagnetic scattering with fixed dt=0.1.
Captures frames only at user-defined snapshot times: [0, Δt, 2Δt, ..., ≤ T_eff],
always including t=0 and the final solver-aligned T (T_eff = floor(T/dt)*dt).
Returns:
frames (np.ndarray), snapshot_times (np.ndarray)
"""
def _log(msg):
if print_callback:
print_callback(msg)
else:
print(msg)
dt = 0.1
# Validate total time and compute solver-aligned end time
try:
T_val = float(T)
except Exception:
return np.array([]), np.array([])
if T_val <= 0:
return np.array([]), np.array([])
steps = int(np.floor(T_val / dt))
if steps <= 0:
return np.array([]), np.array([])
T_eff = steps * dt
# Determine snapshot Δt on solver grid
tol = 1e-12
if snapshot_dt is None:
snapshot_dt_val = dt
else:
try:
snapshot_dt_val = float(snapshot_dt)
except Exception:
snapshot_dt_val = dt
if snapshot_dt_val < dt - tol:
snapshot_dt_val = dt
k = max(1, int(round(snapshot_dt_val / dt)))
snapshot_dt_eff = k * dt
# Build requested snapshot times on solver grid
target_times = [0.0]
t = 0.0
while t + snapshot_dt_eff <= T_eff + tol:
t = round(t + snapshot_dt_eff, 12)
if t <= T_eff + tol:
target_times.append(min(t, T_eff))
if abs(target_times[-1] - T_eff) > tol:
target_times.append(T_eff)
# Setup circuit
nq = int(np.ceil(np.log2(nx)))
dp = 2 * R * np.pi / 2 ** na
p = np.arange(-R * np.pi, R * np.pi, step=dp)
fp = np.exp(-np.abs(p))
system, ancilla = QuantumRegister(2 * nq + 2), QuantumRegister(na)
qc = QuantumCircuit(system, ancilla)
qc.append(StatePreparation(initial_state), system)
qc.append(StatePreparation(fp / np.linalg.norm(fp)), ancilla)
expA1 = V1(nx, dt).to_gate()
expA2 = V2(nx, dt)
frames = []
# Capture initial frame at t=0
sv0 = np.real(Statevector(qc)).reshape(2 ** na, 2 ** (2 * nq + 2))
frames.append(sv0[2 ** (na - 1)])
next_idx = 1 # next target_times index to capture
_log(f"Starting simulation: T={T_eff:.2f}s, steps={steps}, snapshot_dt={snapshot_dt_eff:.2f}s")
for i in range(steps):
if stop_check and stop_check():
_log(f"Simulation interrupted at step {i}/{steps}")
break
# One solver step
qc.append(QFTGate(na), ancilla)
qc.x(ancilla[-1])
for j in range(na - 1):
qc.append(expA1.control().repeat(2 ** j), [ancilla[j]] + system[:])
qc.append(expA1.inverse().control(ctrl_state="0").repeat(2 ** (na - 1)), [ancilla[na - 1]] + system[:])
qc.append(expA2, system[:])
qc.x(ancilla[-1])
qc.append(QFTGate(na).inverse(), ancilla)
current_time = (i + 1) * dt
if next_idx < len(target_times) and abs(current_time - target_times[next_idx]) <= tol:
u = np.real(Statevector(qc)).reshape(2 ** na, 2 ** (2 * nq + 2))
frames.append(u[2 ** (na - 1)])
next_idx += 1
if progress_callback:
try:
progress = ((i + 1) / steps) * 100
progress_callback(progress)
except Exception:
pass
if progress_callback:
try:
progress_callback(100.0)
except Exception:
pass
_log("Simulation completed.")
# Ensure snapshot_times align with number of captured frames (covers early stop)
frames_arr = np.asarray(frames)
times_arr = np.asarray(target_times[: len(frames_arr)])
return frames_arr, times_arr
def create_impulse_preview_state(preview_n: int, pos01, sigma01: float = 0.02):
"""
Smooth delta-like preview on a unit square using a narrow Gaussian (sigma in [0,1]).
Preview-only helper, independent of simulation grid size (nx). Use this for the
Excitation preview; use the *_from_pos() variants for the actual simulation.
"""
try:
sx = float(sigma01) if sigma01 and sigma01 > 0 else 0.02
except Exception:
sx = 0.02
return create_gaussian_state_from_pos((int(preview_n), int(preview_n)), (float(pos01[0]), float(pos01[1])), (sx, sx))
##### Statevector Estimator Simulation Code Below #####
from .base_functions import *
def create_time_frames(total_time, snapshot_interval):
dt = 0.1
tol = 1e-9
try:
T_val = float(total_time)
except (ValueError, TypeError):
return []
if T_val <= 0:
return []
steps = int(np.floor(T_val / dt))
if steps <= 0:
return [0.0]
T_eff = steps * dt
try:
snapshot_dt_val = float(snapshot_interval)
except (ValueError, TypeError):
snapshot_dt_val = dt
if snapshot_dt_val < dt:
snapshot_dt_val = dt
k = max(1, int(round(snapshot_dt_val / dt)))
snapshot_dt_eff = k * dt
times = np.arange(0, T_eff + tol, snapshot_dt_eff)
if abs(times[-1] - T_eff) > tol:
times = np.append(times, T_eff)
times = np.round(times, 12)
unique_times = []
for t in times:
if not unique_times or abs(t - unique_times[-1]) > tol:
unique_times.append(float(t))
return unique_times
def run_sve(field, x, y, T, snapshot_time, nx, initial_state, impulse_pos, progress_callback=None, print_callback=None):
"""Statevector Estimator for time-series field values.
Supports both single-point and multi-point modes.
- Single-point (backward compatible): x, y are integers; returns list[float].
- Multi-point: x is a list/tuple of (ix, iy) integer pairs and y is None; returns dict[(ix,iy) -> list[float]].
"""
def _log(msg):
if print_callback:
print_callback(msg)
else:
print(msg)
na = 1
dt = 0.1
R = 4
nq = int(np.ceil(np.log2(nx)))
# Normalize monitor points input
if isinstance(x, (list, tuple)) and y is None:
points = [tuple(map(int, pt)) for pt in x]
multi = True
else:
points = [(int(x), int(y))]
multi = False
xref, yref = impulse_pos
offset = 0
grid_dims = (nx, nx)
initial_state = create_impulse_state(grid_dims, impulse_pos)
dp = 2 * R * np.pi / 2**na
p = np.arange(- R * np.pi, R * np.pi, step=dp)
fp = np.exp(-np.abs(p))
norm = np.linalg.norm(fp)
time_frames = create_time_frames(T, snapshot_time)
total_frames = len(time_frames)
_log(f"Starting QPU simulation: T={T}s, frames={total_frames}, points={len(points)}")
# Prepare outputs
if multi:
series_by_point = { (px, py): [] for (px, py) in points }
else:
series_single = []
for idx, time in enumerate(time_frames):
steps = int(math.ceil(time / dt))
# Reference Ez field at impulse location for sign
Eref = Eref_value(nx, nq, R, dt, na, steps, xref, yref, field_ref='Ez')
for (px, py) in points:
circ_magnitude = circ_for_magnitude(field, px, py, nx, na, R, dt, initial_state, steps)
magnitude = get_absolute_field_value(circ_magnitude, nq, na, offset, norm)
if field == 'Ez' and px == xref and py == yref:
Field_value = -magnitude if Eref < 0 else magnitude
else:
circsum, circdiff = circuits_for_sign(field, px, py, nx, na, dt, R, initial_state, steps, xref, yref, field_ref='Ez')
sign = get_relative_sign(circsum, circdiff, nq, na)
if (sign == 'same' and Eref > 0) or (sign == 'different' and Eref < 0):
Field_value = magnitude
else:
Field_value = -magnitude
if multi:
series_by_point[(px, py)].append(Field_value)
else:
series_single.append(Field_value)
# Calculate and report progress
pct = (idx + 1) / total_frames * 100
if progress_callback:
progress_callback(pct)
_log(f"SVE Progress: {int(pct)}% (frame {idx + 1}/{total_frames})")
_log("Statevector Estimator simulation completed.")
return series_by_point if multi else series_single