quantum / utils /EBU_Quantum /no_body /base_functions.py
harishaseebat92
IonQ QPU branch is now implemented
20f8534
import numpy as np
import scipy.sparse as sp
import math
import random
import time
import psutil
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
class MissingCredentialError(RuntimeError):
"""Raised when a required API key/secret is not available at runtime."""
def _require_env(var_name: str, *, context: str) -> str:
value = os.environ.get(var_name)
if value is None or str(value).strip() == "":
raise MissingCredentialError(
f"Missing required secret '{var_name}'. "
f"Set it as a runtime environment variable (e.g., Hugging Face Space → Settings → Secrets) "
f"before running {context}."
)
return value
import matplotlib.pyplot as plt
from scipy.special import jn
from scipy.sparse import identity, csr_matrix, kron, diags, eye
from qiskit.circuit import QuantumCircuit, QuantumRegister, ClassicalRegister
from qiskit.circuit.library import MCXGate, MCPhaseGate, RXGate, CRXGate, QFTGate, StatePreparation, PauliEvolutionGate, RZGate
from qiskit.quantum_info import SparsePauliOp, Statevector, Operator, Pauli
from scipy.linalg import expm
# from tools import *
from qiskit.qasm3 import dumps # QASM 3 exporter
from qiskit.qasm3 import loads
from qiskit.circuit.library import QFT
from qiskit.primitives import StatevectorEstimator
from qiskit import transpile
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
from qiskit_aer import Aer
from qiskit_ibm_runtime import QiskitRuntimeService, EstimatorV2 as Estimator, Batch
from qiskit_ionq import IonQProvider
def Wj_block(j, n, ctrl_state, theta, lam, name='Wj_block', xgate=False):
if not xgate:
name = f' $W_{j}_block$ '
qc=QuantumCircuit(n + j, name=name)
if j > 1:
qc.cx(n + j-1, range(n, n+j-1))
if lam != 0:
qc.p(lam, n + j -1)
qc.h(n + j -1)
if xgate and j>1:
if isinstance(xgate, (list, tuple)): # selective application
for idx, flag in enumerate(xgate):
if flag: # only apply where flag == 1
qc.x(n + idx)
elif xgate is True: # apply to all
qc.x(range(n, n+j-1))
# the multicontrolled rz gate
# it will be decomposed in qiskit
if j > 1:
mcrz = RZGate(theta).control(len(ctrl_state) + j-1, ctrl_state = "1"*(j-1)+ctrl_state)
qc.append(mcrz, range(0, n + j))
else:
mcrz = RZGate(theta).control(len(ctrl_state), ctrl_state = ctrl_state)
qc.append(mcrz, range(0, n+j))
if xgate and j>1:
if isinstance(xgate, (list, tuple)): # selective application
for idx, flag in enumerate(xgate):
if flag: # only apply where flag == 1
qc.x(n + idx)
elif xgate is True: # apply to all
qc.x(range(n, n+j-1))
qc.h(n+ j-1)
if lam != 0:
qc.p(-lam, n + j-1)
if j > 1:
qc.cx(n + j-1, range(n, n +j-1))
return qc.to_gate(label=name)
def V1(nx, dt, name = "V1"):
n = int(np.ceil(np.log2(nx)))
derivatives = QuantumRegister(2*n)
blocks = QuantumRegister(2)
qc = QuantumCircuit(derivatives, blocks)
W1 = Wj_block(2, n, "0"*n, -dt , 0, xgate=True)
qc.append(W1, list(derivatives[0:n])+list(blocks[:]))
# qc.barrier()
W2 = Wj_block(3, n-1, "1"*(n-1), dt , 0, xgate=[0,1])
qc.append(W2, list(derivatives[1:n])+[derivatives[0]]+list(blocks[:]))
# # qc.barrier()
W3 = Wj_block(1, n+1, "0"*(n+1), dt , 0, xgate=False)
qc.append(W3, list(derivatives[n:2*n])+list(blocks[:]))
# # qc.barrier()
W4 = Wj_block(2, n, "0"+"1"*(n-1), -dt , 0, xgate=False)
qc.append(W4, list(derivatives[n+1:2*n]) + [blocks[0]] + [derivatives[n]] + [blocks[1]])
return qc
def V2(nx, dt, name = "V2"):
n = int(np.ceil(np.log2(nx)))
derivatives = QuantumRegister(2*n)
blocks = QuantumRegister(2)
qc = QuantumCircuit(derivatives, blocks)
W1 = Wj_block(2, 0, "", -2*dt , -np.pi/2, xgate=True)
qc.append(W1, list(blocks[:]))
# qc.barrier()
for j in range(1, n+1):
W2 = Wj_block(2+j, 0, "", 2*dt , -np.pi/2, xgate=[1]*(j-1)+[0,1])
qc.append(W2, list(derivatives[0:j])+list(blocks[:]))
# qc.barrier()
W3 = Wj_block(2, n, "0"*n, -dt , -np.pi/2, xgate=True)
qc.append(W3, list(derivatives[0:n])+list(blocks[:]))
# qc.barrier()
W4 = Wj_block(2, n, "1"*n, 2*dt , -np.pi/2, xgate=True)
qc.append(W4, list(derivatives[0:n])+list(blocks[:]))
# qc.barrier()
W5 = Wj_block(3, n-1, "1"*(n-1), dt , -np.pi/2, xgate=[0,1])
qc.append(W5, list(derivatives[1:n])+[derivatives[0]]+list(blocks[:]))
# qc.barrier()
W6 = Wj_block(1, 1, "0", 2*dt , -np.pi/2, xgate=False)
qc.append(W6, list(blocks[:]))
# qc.barrier()
for j in range(1, n+1):
W7 = Wj_block(1+j, 1, "0", -2*dt , -np.pi/2, xgate=[1]*(j-1))
qc.append(W7, [blocks[0]]+list(derivatives[n:n+j])+[blocks[1]])
# qc.barrier()
W8 = Wj_block(1, n+1, "0"*(n+1), dt , -np.pi/2, xgate=False)
qc.append(W8, list(derivatives[n:2*n])+list(blocks[:]))
# qc.barrier()
W9 = Wj_block(1, n+1, "0"+"1"*(n), -2*dt , -np.pi/2, xgate=False)
qc.append(W9, list(derivatives[n:2*n])+list(blocks[:]))
# qc.barrier()
W10 = Wj_block(2, n, "0"+"1"*(n-1), -dt , -np.pi/2, xgate=False)
qc.append(W10, list(derivatives[n+1:2*n]) + [blocks[0]] + [derivatives[n]] + [blocks[1]])
# qc.barrier()
return qc
def schro(nx, na, R, dt, initial_state, steps):
nq = int(np.ceil(np.log2(nx)))
# warped phase transformation
dp = 2 * R * np.pi / 2**na
p = np.arange(- R * np.pi, R * np.pi, step=dp)
fp = np.exp(-np.abs(p))
norm1 = np.linalg.norm(fp[2**(na-1):]) # norm of p>=0
# construct quantum circuit
system = QuantumRegister(2*nq+2, name='system')
ancilla = QuantumRegister(na, name='ancilla')
qc = QuantumCircuit(system, ancilla)
# initialization
prep = StatePreparation(initial_state)
anc_prep = StatePreparation(fp / np.linalg.norm(fp))
qc.append(prep, system)
qc.append(anc_prep, ancilla)
# QFT
qc.append(QFTGate(na), ancilla)
qc.x(ancilla[-1])
A1 = V1(nx, dt, name = "V1").to_gate()
A2 = V2(nx, dt, name = "V2").to_gate()
# Hamiltonian simulation for Nt steps
for i in range(steps):
# circuit for one step
for j in range(na):
# repeat controlled H1 for 2**j times
qc.append(A1.control().repeat(2**j), [ancilla[j]] + system[:])
# qc.append(A1.inverse().control(ctrl_state = "0").repeat(2**(na-1)), [ancilla[na-1]] + system[:])
qc.append(A1.inverse().repeat(2**(na-1)), system[:])
qc.append(A2, system[:])
# rearrange eta
qc.x(ancilla[-1])
qc.append(QFTGate(na).inverse(), ancilla)
return qc
def circ_for_magnitude(field, x, y, nx, na, R, dt, initial_state, steps):
qc = schro(nx, na, R, dt, initial_state, steps)
naimark = QuantumRegister(1, name='Naimark')
qc.add_register(naimark)
if field == 'Ez':
index = nx * y + x
elif field == 'Hx':
index = 2*nx*nx + nx * y + x
else:
index = 3*nx*nx + nx * y + x
index_bin = format(index, f'0{qc.num_qubits-2}b')
ctrl_state = '1' + index_bin
ctrl_qubits = qc.qubits[:-1]
qc.mcx(ctrl_qubits, naimark[0], ctrl_state=ctrl_state)
return qc
def circuits_for_sign(field, x, y, nx, na, dt, R, initial_state, steps, xref, yref, field_ref = 'Ez'):
qc = schro(nx, na, R, dt, initial_state, steps)
naimark = QuantumRegister(1, name='Naimark')
qc.add_register(naimark)
if field == 'Ez':
index = nx * y + x
elif field == 'Hx':
index = 2*nx*nx + nx * y + x
else:
index = 3*nx*nx + nx * y + x
if field_ref == 'Ez':
index_ref = nx * yref + xref
elif field_ref == 'Hx':
index_ref = 2*nx*nx + nx * yref + xref
else:
index_ref = 3*nx*nx + nx * yref + xref
index_bin = [(index >> i) & 1 for i in range(qc.num_qubits-2)]
index_ref_bin = [(index_ref >> i) & 1 for i in range(qc.num_qubits-2)]
index_bin.append(1)
index_ref_bin.append(1)
#Convert reference bitstring to 00000
for i, bit in enumerate(index_ref_bin):
if bit == 1:
qc.x(i)
d_bits = [b ^ r for b, r in zip(index_ref_bin, index_bin)]
control = d_bits.index(1)
#Convert the other bitstring to 0001000
for target, bit in enumerate(d_bits):
if bit == 1 and target != control:
qc.cx(control, target)
qc.h(control)
ctrl_state_sum = '0'*(qc.num_qubits-1)
ctrl_state_diff = '0'*(qc.num_qubits-1-control-1)+'1'+'0'*(control)
qcdiff = qc.copy()
ctrl_qubits = qc.qubits[:-1]
qc.mcx(ctrl_qubits, naimark[0], ctrl_state=ctrl_state_sum)
qcdiff.mcx(ctrl_qubits, naimark[0], ctrl_state=ctrl_state_diff)
return qc, qcdiff
def get_absolute_field_values(all_circuits, shots, pm_optimization_level, simulation = "True", platform = "IBM", progress_callback=None, print_callback=None):
"""
Execute circuits on quantum backend and return field values.
Parameters
----------
progress_callback : callable, optional
Function to report progress (40-90 range for Step 2 execution)
print_callback : callable, optional
Function for logging messages
"""
import time as time_module
def _log(msg):
if print_callback:
print_callback(msg)
def _progress(pct, message=None):
if progress_callback:
try:
progress_callback(pct, message)
except TypeError:
# Old-style callback without message parameter
try:
progress_callback(pct)
except Exception:
pass
except Exception:
pass
jobs = {}
job_status = {key: "QUEUED" for key in jobs}
total_circuits = len(all_circuits)
_log(f"Step 2: Submitting {total_circuits} circuit(s) for execution...")
_progress(40, f"Submitting {total_circuits} circuits...")
if simulation=="True":
backend = Aer.get_backend('qasm_simulator')
_log(f"Using simulator backend: {backend.name()}")
_progress(42, f"Backend: {backend.name()}")
pm = generate_preset_pass_manager(backend=backend, optimization_level=pm_optimization_level)
with Batch(backend=backend, max_time="2h") as batch:
estimator = Estimator()
for idx, (key, qc) in enumerate(all_circuits.items()):
qc = transpile(qc, basis_gates=['u', 'cx', 'rz', 'sx', 'x'])
pauli_label = 'Z'+'I'*(qc.num_qubits-1)
qc_compiled = pm.run(qc)
layout = qc_compiled.layout
observable = SparsePauliOp(Pauli(pauli_label)).apply_layout(layout)
job = estimator.run([(qc_compiled,observable)], precision=1/np.sqrt(shots))
jobs[key] = job
# Progress: 42-50% for submission
submit_pct = 42 + ((idx + 1) / total_circuits) * 8
_progress(submit_pct, f"Submitted circuit {idx+1}/{total_circuits}")
elif simulation == "False" and platform == "IBM":
_log("Connecting to IBM Quantum service...")
_progress(42, "Connecting to IBM Quantum...")
ibm_token = _require_env("API_KEY_IBM_EM", context="IBM EM QPU execution")
service = QiskitRuntimeService(
channel="ibm_cloud",
token=ibm_token,
instance="crn:v1:bluemix:public:quantum-computing:us-east:a/15157e4350c04a9dab51b8b8a4a93c86:e29afd91-64bf-4a82-8dbf-731e6c213595::",
)
backend = service.least_busy(operational=True, simulator=False)
_log(f"Selected IBM QPU backend: {backend.name}")
_progress(45, f"Backend: {backend.name}")
pm = generate_preset_pass_manager(backend=backend, optimization_level=pm_optimization_level)
with Batch(backend=backend, max_time="2h") as batch:
estimator = Estimator()
estimator.options.resilience_level = 0
estimator.options.dynamical_decoupling.enable = True
estimator.options.dynamical_decoupling.sequence_type = "XpXm"
estimator.options.resilience.measure_mitigation = True
estimator.options.resilience.zne_mitigation = True
estimator.options.resilience.zne.noise_factors = (1.1, 1.3, 1.5)
estimator.options.resilience.zne.extrapolator = ("exponential", "linear")
for idx, (key, qc) in enumerate(all_circuits.items()):
qc = transpile(qc, basis_gates=['u', 'cx', 'rz', 'sx', 'x'])
pauli_label = 'Z'+'I'*(qc.num_qubits-1)
qc_compiled = pm.run(qc)
layout = qc_compiled.layout
observable = SparsePauliOp(Pauli(pauli_label)).apply_layout(layout)
job = estimator.run([(qc_compiled,observable)], precision=1/np.sqrt(shots))
jobs[key] = job
# Progress: 45-55% for submission
submit_pct = 45 + ((idx + 1) / total_circuits) * 10
_progress(submit_pct, f"Submitted circuit {idx+1}/{total_circuits}")
elif simulation == "False" and platform == "IONQ":
_log("Connecting to IonQ service...")
_progress(42, "Connecting to IonQ...")
ionq_token = _require_env("API_KEY_IONQ_EM", context="IonQ EM QPU execution")
os.environ.setdefault("IONQ_API_TOKEN", ionq_token)
provider = IonQProvider()
ionq_backend = provider.get_backend("qpu.forte-enterprise-1")
_log(f"Selected IonQ backend: {ionq_backend.name}")
_progress(45, f"Backend: {ionq_backend.name}")
pm = generate_preset_pass_manager(backend=ionq_backend, optimization_level=1)
with Batch(backend=ionq_backend, max_time="2h") as batch:
estimator = Estimator()
for idx, (key, qc) in enumerate(all_circuits.items()):
pauli_label = 'Z'+'I'*(qc.num_qubits-1)
qc_compiled = transpile(qc,basis_gates=['ry','rz','rx','h','cx'])
layout = qc_compiled.layout
observable = SparsePauliOp(Pauli(pauli_label)).apply_layout(layout)
job = estimator.run([(qc_compiled,observable)], precision=1/np.sqrt(shots))
jobs[key] = job
submit_pct = 45 + ((idx + 1) / total_circuits) * 10
_progress(submit_pct, f"Submitted circuit {idx+1}/{total_circuits}")
# Poll for job completion with status updates (55-85%)
_log("Jobs submitted. Waiting for results...")
_progress(55, "Jobs queued, waiting for execution...")
total_jobs = len(jobs)
completed_jobs = 0
job_keys = list(jobs.keys())
pending_keys = set(job_keys)
poll_count = 0
max_polls = 600 # ~10 minutes with 1s interval
while pending_keys and poll_count < max_polls:
for key in list(pending_keys):
try:
job = jobs[key]
status = job.status()
status_name = status.name if hasattr(status, 'name') else str(status)
if status_name != job_status.get(key):
job_status[key] = status_name
_log(f"Job {key[:30]}... Status: {status_name}")
if status_name in ('DONE', 'ERROR', 'CANCELLED'):
pending_keys.discard(key)
completed_jobs += 1
# Progress: 55-85% based on completion
completion_pct = 55 + (completed_jobs / total_jobs) * 30
_progress(completion_pct, f"Completed {completed_jobs}/{total_jobs} jobs")
except Exception as e:
pass
if pending_keys:
# Show indeterminate progress while waiting
queued_count = sum(1 for s in job_status.values() if s in ('QUEUED', 'VALIDATING', 'INITIALIZING'))
running_count = sum(1 for s in job_status.values() if s == 'RUNNING')
if running_count > 0:
# Slowly increment while running (55-85 range)
run_progress = 55 + min(30, poll_count * 0.1)
_progress(run_progress, f"Running... ({running_count} active, {queued_count} queued)")
elif queued_count > 0:
queue_progress = 55 + min(10, poll_count * 0.05)
_progress(queue_progress, f"Queued... (waiting {poll_count}s)")
time_module.sleep(1)
poll_count += 1
_log("All jobs completed. Retrieving results...")
_progress(87, "Retrieving results...")
#########################################################################################################
results = {}
for idx, (key, job) in enumerate(jobs.items()):
res = job.result()[0]
z_exp = res.data.evs.item() # expectation value
results[key] = np.sqrt((1 - z_exp) / 2)
# Progress: 87-90% for result retrieval
result_pct = 87 + ((idx + 1) / total_jobs) * 3
_progress(result_pct, f"Retrieved result {idx+1}/{total_jobs}")
_progress(90, "All results retrieved")
#########################################################################################
return results
def build_optimized_circuits(nx, impulse_pos, field, grid_point, time_values, optimization="True", progress_callback=None):
na = 1
dt = 0.1
R = 4
xref, yref = impulse_pos
x, y = grid_point
grid_dims = (nx, nx)
initial_state = create_impulse_state(grid_dims, impulse_pos)
dp = 2 * R * np.pi / 2**na
p = np.arange(-R * np.pi, R * np.pi, step=dp)
fp = np.exp(-np.abs(p))
norm_anc = np.linalg.norm(fp[2**(na - 1):]) # norm of p>=0
all_circuits = {}
norm_offset_ref = {}
MAX_THREADS = 20 # soft upper bound
MEMORY_THRESHOLD = 75 # maximum % RAM to allow for parallel tasks
futures = {}
def submit_task_safe(executor, func, *args):
"""Submit task only if memory usage is below threshold."""
while psutil.virtual_memory().percent > MEMORY_THRESHOLD:
time.sleep(0.5) # wait until memory drops
return executor.submit(func, *args)
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
total_steps = len(time_values)
for idx, time_value in enumerate(time_values):
steps = int(math.ceil(time_value / dt))
offset_ref = 0 if steps < 9 else 0.31
deltastate = np.zeros(4 * nx * nx)
deltastate[nx * yref + xref] = 1
deltastate[0:nx * nx] += offset_ref
norm_ref_state = np.linalg.norm(deltastate)
initial_state_ref = deltastate / norm_ref_state
key_norm_offset_ref = f"norm_offset_ref_time{time_value}"
norm_offset_ref[key_norm_offset_ref] = (norm_ref_state, offset_ref)
# Build reference circuit
qc_ref = circ_for_magnitude("Ez", xref, yref, nx, na, R, dt, initial_state_ref, steps)
key_ref = f"fieldEz_ref_time{time_value}"
if not (field == "Ez" and (x, y) == (xref, yref)):
circ_magnitude = circ_for_magnitude(field, x, y, nx, na, R, dt, initial_state, steps)
circsum, circdiff = circuits_for_sign(field, x, y, nx, na, dt, R,initial_state, steps, xref, yref, field_ref='Ez')
if optimization == "True":
# Submit all 4 optimization tasks into the global pool
futures[submit_task_safe(executor, generate_optimized_circuit, qc_ref)] = key_ref
futures[submit_task_safe(executor, generate_optimized_circuit, circ_magnitude)] = (
f"field{field}_magnitude_x{x}_y{y}_time{time_value}"
)
futures[submit_task_safe(executor, generate_optimized_circuit, circsum)] = (
f"field{field}_sum_x{x}_y{y}_time{time_value}"
)
futures[submit_task_safe(executor, generate_optimized_circuit, circdiff)] = (
f"field{field}_diff_x{x}_y{y}_time{time_value}"
)
else:
# No optimization → store directly
all_circuits[key_ref] = qc_ref
all_circuits[f"field{field}_magnitude_x{x}_y{y}_time{time_value}"] = circ_magnitude
all_circuits[f"field{field}_sum_x{x}_y{y}_time{time_value}"] = circsum
all_circuits[f"field{field}_diff_x{x}_y{y}_time{time_value}"] = circdiff
else:
# Reference point
if optimization == "True":
futures[submit_task_safe(executor, generate_optimized_circuit, qc_ref)] = key_ref
else:
all_circuits[key_ref] = qc_ref
# If NOT optimizing, report progress here (0-40%)
if optimization != "True" and progress_callback:
pct = ((idx + 1) / total_steps) * 40.0
try:
progress_callback(pct)
except Exception:
pass
# Collect results as they finish
total_futures = len(futures)
completed_count = 0
if total_futures > 0:
for future in as_completed(futures):
key = futures[future]
all_circuits[key] = future.result()
completed_count += 1
if progress_callback:
# Map 0-100% of this task to 0-40% of total progress
pct = (completed_count / total_futures) * 40.0
try:
progress_callback(pct)
except Exception:
pass
elif progress_callback:
# If no futures (no optimization), we are done with this step instantly
try:
progress_callback(40.0)
except Exception:
pass
return all_circuits, norm_offset_ref, norm_anc
def create_time_frames(total_time, snapshot_interval):
"""
Create a list of time frames for simulation snapshots.
Matches the logic from delta_impulse_generator.py run_sve().
"""
dt = 0.1
tol = 1e-9
try:
T_val = float(total_time)
except (ValueError, TypeError):
return []
if T_val <= 0:
return []
steps = int(np.floor(T_val / dt))
if steps <= 0:
return [0.0]
T_eff = steps * dt
try:
snapshot_dt_val = float(snapshot_interval)
except (ValueError, TypeError):
snapshot_dt_val = dt
if snapshot_dt_val < dt:
snapshot_dt_val = dt
k = max(1, int(round(snapshot_dt_val / dt)))
snapshot_dt_eff = k * dt
times = np.arange(0, T_eff + tol, snapshot_dt_eff)
if abs(times[-1] - T_eff) > tol:
times = np.append(times, T_eff)
times = np.round(times, 12)
unique_times = []
for t in times:
if not unique_times or abs(t - unique_times[-1]) > tol:
unique_times.append(float(t))
return unique_times
def get_field_values(field, x, y, T, snapshot_time, nx, impulse_pos, shots, pm_optimization_level, simulation="True", optimization="True", platform="IBM", progress_callback=None, print_callback=None):
"""
Run quantum simulation for a single field component at a single position
and return time-series field values.
This function is designed for IBM QPU with the restriction that only ONE field
component and ONE position coordinate can be processed at a time.
Parameters
----------
field : str
Field component to measure: 'Ez', 'Hx', or 'Hy'.
x : int
X grid index of the monitor position.
** IBM QPU restriction: Only ONE position allowed! **
y : int
Y grid index of the monitor position.
T : float
Total simulation time.
snapshot_time : float
Time interval between snapshots.
nx : int
Grid dimension (grid is nx x nx).
impulse_pos : tuple
(x, y) integer grid indices of the initial impulse.
shots : int
Number of shots for QPU execution.
pm_optimization_level : int
Pass manager optimization level (0-3).
simulation : str
"True" for simulator, "False" for real QPU.
optimization : str
"True" to use ADAPT-AQC optimization, "False" otherwise.
platform : str
"IBM" or "IONQ".
progress_callback : callable, optional
Function to report progress (0-100).
print_callback : callable, optional
Function for logging messages.
Returns
-------
field_values : list[float]
Time-series of field values at the specified position.
"""
def _log(msg):
if print_callback:
print_callback(msg)
else:
print(msg)
xref, yref = impulse_pos
grid_point = (int(x), int(y))
# Create time frames from T and snapshot_time (matching run_sve logic)
time_values = create_time_frames(T, snapshot_time)
total_frames = len(time_values)
if total_frames == 0:
_log("No valid time frames generated.")
return []
_log(f"Starting QPU simulation: T={T}s, frames={total_frames}, platform={platform}")
_log(f"Time frames: {time_values}")
_log(f"Impulse position (grid): {impulse_pos}")
_log(f"Monitor position (grid): {grid_point}")
_log(f"Field component: {field}")
# --- Step 1: Circuit Construction & Optimization (0-40%) ---
_log("Step 1: Circuit Construction & Optimization")
if progress_callback:
try:
progress_callback(0.0)
except Exception:
pass
# Build circuits using existing logic
# Pass progress_callback to track optimization progress (0-40%)
all_circuits, norm_offset_ref, norm_anc = build_optimized_circuits(nx, impulse_pos, field, grid_point, time_values, optimization, progress_callback)
# --- Step 2: Circuit Execution (40-90%) ---
_log("Step 2: Circuit Execution")
if progress_callback:
try:
progress_callback(40.0)
except Exception:
pass
results = get_absolute_field_values(all_circuits, shots, pm_optimization_level, simulation, platform, progress_callback=progress_callback, print_callback=print_callback)
# --- Step 3: Result Processing (90-100%) ---
_log("Step 3: Result Processing")
if progress_callback:
try:
progress_callback(90.0)
except Exception:
pass
# Process results using existing logic
Field_values = []
for idx, time_value in enumerate(time_values):
key_ref = f"fieldEz_ref_time{time_value}"
key_norm_offset_ref = f"norm_offset_ref_time{time_value}"
Ezref = norm_offset_ref[key_norm_offset_ref][0]*norm_anc*results[key_ref]-norm_offset_ref[key_norm_offset_ref][1]
if field == "Ez" and grid_point == (xref, yref):
Field_values.append(Ezref)
else:
key_magnitude = f"field{field}_magnitude_x{grid_point[0]}_y{grid_point[1]}_time{time_value}"
key_sum = f"field{field}_sum_x{grid_point[0]}_y{grid_point[1]}_time{time_value}"
key_diff = f"field{field}_diff_x{grid_point[0]}_y{grid_point[1]}_time{time_value}"
magnitude = norm_anc*results[key_magnitude]
magnitude_sum = norm_anc*results[key_sum]
magnitude_diff = norm_anc*results[key_diff]
if (magnitude_sum >= magnitude_diff and Ezref >= 0) or (magnitude_sum < magnitude_diff and Ezref < 0):
Field_values.append(magnitude)
else:
Field_values.append(-magnitude)
if progress_callback:
try:
# Map remaining 10% (90-100%) to result processing
progress = 90.0 + ((idx + 1) / total_frames) * 10.0
progress_callback(progress)
except Exception:
pass
_log("QPU simulation completed.")
return Field_values
def transpile_circ(circ, basis_gates=None):
"""
Transpile the circuit to the specified basis gates.
"""
if basis_gates is None:
basis_gates = ['z', 'y', 'x', 'sdg', 's', 'h', 'rz', 'ry', 'rx', 'ecr', 'cz', 'cx']
transpiled_circ = transpile(circ, basis_gates=basis_gates)
return transpiled_circ
def create_impulse_state(grid_dims, impulse_pos):
grid_width, grid_height = grid_dims
impulse_x, impulse_y = impulse_pos
# --- Input Validation ---
# Ensure the requested impulse position is actually on the grid.
if not (0 <= impulse_x < grid_width and 0 <= impulse_y < grid_height):
raise ValueError(f"Impulse position ({impulse_x}, {impulse_y}) is outside the "
f"grid dimensions ({grid_width}x{grid_height}).")
# --- 1. Calculate the 1D Array Index ---
# Convert the (x, y) coordinate to a single index in a flattened 1D array.
# The formula for row-major order is: index = y_coord * width + x_coord
flat_index = impulse_y * grid_width + impulse_x
# --- 2. Create the Full, Padded State Vector ---
grid_size = grid_width * grid_height
total_size = 4 * grid_size # The simulation space is 4x the grid size.
initial_state = np.zeros(total_size)
# --- 3. Set the Delta Impulse ---
initial_state[flat_index] = 1
return initial_state
###############################################################################################################################################
#ADAPT_AQC optimization part
import tempfile
import subprocess
import sys
import os
from pathlib import Path
from qiskit import QuantumCircuit
from qiskit.qasm3 import dumps, loads
# --- Dynamic Path Detection ---
# This file is at: .../quantum/utils/EBU_Quantum/no_body/base_functions.py
# In Docker: /home/user/app/utils/EBU_Quantum/no_body/base_functions.py
# We need to find the 'utils' directory which is 3 levels up from this file
CURRENT_FILE = Path(__file__).resolve()
# Go up to the 'utils' directory (3 levels up from base_functions.py)
# base_functions.py -> no_body -> EBU_Quantum -> utils
UTILS_DIR = CURRENT_FILE.parents[2]
# The aqc_venv is directly inside utils/
VENV_DIR = UTILS_DIR / "aqc_venv"
# PROJECT_ROOT is the parent of 'utils' (needed for module imports in subprocess)
# In Docker: /home/user/app
# Locally: .../quantum (or .../webui_trame/quantum)
PROJECT_ROOT = UTILS_DIR.parent
if sys.platform == "win32":
ADAPT_PYTHON = str(VENV_DIR / "Scripts" / "python.exe")
else:
ADAPT_PYTHON = str(VENV_DIR / "bin" / "python")
# 3. Define the module path - this should work from PROJECT_ROOT
ADAPT_MODULE = "utils.EBU_Quantum.no_body.run_adapt_aqc"
def generate_optimized_circuit(qc: QuantumCircuit) -> QuantumCircuit:
"""
Run ADAPT-AQC optimization fully in memory using QASM 3 strings,
safely encoded through stdin/stdout.
"""
# Serialize circuit to QASM 3
qasm_str = dumps(qc)
# Inline script for the adapt-aqc environment.
# Reads QASM3 from stdin and writes QASM3 to stdout.
# We dynamically add the research library paths to sys.path inside the subprocess.
code = f"""
import sys
import os
# Add paths to the research libraries
# CWD will be PROJECT_ROOT (e.g., /home/user/app in Docker)
sys.path.append(os.path.join(os.getcwd(), 'utils', 'aqc-research'))
sys.path.append(os.path.join(os.getcwd(), 'utils', 'adapt-aqc'))
from qiskit.qasm3 import loads, dumps
# Now we can import the module
from {ADAPT_MODULE} import run_adapt_aqc
# Read entire stdin safely
data = sys.stdin.read()
# Convert QASM3 string back to circuit
qc = loads(data)
# Optimize
qc_opt = run_adapt_aqc(qc)
# Print optimized circuit as QASM3
sys.stdout.write(dumps(qc_opt))
"""
# Launch subprocess, pipe in QASM3 string
# We set cwd to PROJECT_ROOT so imports like 'utils...' work correctly
proc = subprocess.run(
[ADAPT_PYTHON, "-c", code],
input=qasm_str,
text=True,
capture_output=True,
cwd=str(PROJECT_ROOT),
)
if proc.returncode != 0:
print("STDOUT:\n", proc.stdout)
print("STDERR:\n", proc.stderr)
raise RuntimeError(f"Adapt-AQC failed with exit code {proc.returncode}")
# Parse optimized QASM 3 back into circuit
try:
qc_opt = loads(proc.stdout)
except Exception as e:
print("⚠️ QASM3 parse error on return. Output was:")
print(proc.stdout[:500])
raise e
return qc_opt