Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import scipy.sparse as sp | |
| import math | |
| import random | |
| import time | |
| import psutil | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import os | |
| class MissingCredentialError(RuntimeError): | |
| """Raised when a required API key/secret is not available at runtime.""" | |
| def _require_env(var_name: str, *, context: str) -> str: | |
| value = os.environ.get(var_name) | |
| if value is None or str(value).strip() == "": | |
| raise MissingCredentialError( | |
| f"Missing required secret '{var_name}'. " | |
| f"Set it as a runtime environment variable (e.g., Hugging Face Space → Settings → Secrets) " | |
| f"before running {context}." | |
| ) | |
| return value | |
| import matplotlib.pyplot as plt | |
| from scipy.special import jn | |
| from scipy.sparse import identity, csr_matrix, kron, diags, eye | |
| from qiskit.circuit import QuantumCircuit, QuantumRegister, ClassicalRegister | |
| from qiskit.circuit.library import MCXGate, MCPhaseGate, RXGate, CRXGate, QFTGate, StatePreparation, PauliEvolutionGate, RZGate | |
| from qiskit.quantum_info import SparsePauliOp, Statevector, Operator, Pauli | |
| from scipy.linalg import expm | |
| # from tools import * | |
| from qiskit.qasm3 import dumps # QASM 3 exporter | |
| from qiskit.qasm3 import loads | |
| from qiskit.circuit.library import QFT | |
| from qiskit.primitives import StatevectorEstimator | |
| from qiskit import transpile | |
| from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager | |
| from qiskit_aer import Aer | |
| from qiskit_ibm_runtime import QiskitRuntimeService, EstimatorV2 as Estimator, Batch | |
| from qiskit_ionq import IonQProvider | |
| def Wj_block(j, n, ctrl_state, theta, lam, name='Wj_block', xgate=False): | |
| if not xgate: | |
| name = f' $W_{j}_block$ ' | |
| qc=QuantumCircuit(n + j, name=name) | |
| if j > 1: | |
| qc.cx(n + j-1, range(n, n+j-1)) | |
| if lam != 0: | |
| qc.p(lam, n + j -1) | |
| qc.h(n + j -1) | |
| if xgate and j>1: | |
| if isinstance(xgate, (list, tuple)): # selective application | |
| for idx, flag in enumerate(xgate): | |
| if flag: # only apply where flag == 1 | |
| qc.x(n + idx) | |
| elif xgate is True: # apply to all | |
| qc.x(range(n, n+j-1)) | |
| # the multicontrolled rz gate | |
| # it will be decomposed in qiskit | |
| if j > 1: | |
| mcrz = RZGate(theta).control(len(ctrl_state) + j-1, ctrl_state = "1"*(j-1)+ctrl_state) | |
| qc.append(mcrz, range(0, n + j)) | |
| else: | |
| mcrz = RZGate(theta).control(len(ctrl_state), ctrl_state = ctrl_state) | |
| qc.append(mcrz, range(0, n+j)) | |
| if xgate and j>1: | |
| if isinstance(xgate, (list, tuple)): # selective application | |
| for idx, flag in enumerate(xgate): | |
| if flag: # only apply where flag == 1 | |
| qc.x(n + idx) | |
| elif xgate is True: # apply to all | |
| qc.x(range(n, n+j-1)) | |
| qc.h(n+ j-1) | |
| if lam != 0: | |
| qc.p(-lam, n + j-1) | |
| if j > 1: | |
| qc.cx(n + j-1, range(n, n +j-1)) | |
| return qc.to_gate(label=name) | |
| def V1(nx, dt, name = "V1"): | |
| n = int(np.ceil(np.log2(nx))) | |
| derivatives = QuantumRegister(2*n) | |
| blocks = QuantumRegister(2) | |
| qc = QuantumCircuit(derivatives, blocks) | |
| W1 = Wj_block(2, n, "0"*n, -dt , 0, xgate=True) | |
| qc.append(W1, list(derivatives[0:n])+list(blocks[:])) | |
| # qc.barrier() | |
| W2 = Wj_block(3, n-1, "1"*(n-1), dt , 0, xgate=[0,1]) | |
| qc.append(W2, list(derivatives[1:n])+[derivatives[0]]+list(blocks[:])) | |
| # # qc.barrier() | |
| W3 = Wj_block(1, n+1, "0"*(n+1), dt , 0, xgate=False) | |
| qc.append(W3, list(derivatives[n:2*n])+list(blocks[:])) | |
| # # qc.barrier() | |
| W4 = Wj_block(2, n, "0"+"1"*(n-1), -dt , 0, xgate=False) | |
| qc.append(W4, list(derivatives[n+1:2*n]) + [blocks[0]] + [derivatives[n]] + [blocks[1]]) | |
| return qc | |
| def V2(nx, dt, name = "V2"): | |
| n = int(np.ceil(np.log2(nx))) | |
| derivatives = QuantumRegister(2*n) | |
| blocks = QuantumRegister(2) | |
| qc = QuantumCircuit(derivatives, blocks) | |
| W1 = Wj_block(2, 0, "", -2*dt , -np.pi/2, xgate=True) | |
| qc.append(W1, list(blocks[:])) | |
| # qc.barrier() | |
| for j in range(1, n+1): | |
| W2 = Wj_block(2+j, 0, "", 2*dt , -np.pi/2, xgate=[1]*(j-1)+[0,1]) | |
| qc.append(W2, list(derivatives[0:j])+list(blocks[:])) | |
| # qc.barrier() | |
| W3 = Wj_block(2, n, "0"*n, -dt , -np.pi/2, xgate=True) | |
| qc.append(W3, list(derivatives[0:n])+list(blocks[:])) | |
| # qc.barrier() | |
| W4 = Wj_block(2, n, "1"*n, 2*dt , -np.pi/2, xgate=True) | |
| qc.append(W4, list(derivatives[0:n])+list(blocks[:])) | |
| # qc.barrier() | |
| W5 = Wj_block(3, n-1, "1"*(n-1), dt , -np.pi/2, xgate=[0,1]) | |
| qc.append(W5, list(derivatives[1:n])+[derivatives[0]]+list(blocks[:])) | |
| # qc.barrier() | |
| W6 = Wj_block(1, 1, "0", 2*dt , -np.pi/2, xgate=False) | |
| qc.append(W6, list(blocks[:])) | |
| # qc.barrier() | |
| for j in range(1, n+1): | |
| W7 = Wj_block(1+j, 1, "0", -2*dt , -np.pi/2, xgate=[1]*(j-1)) | |
| qc.append(W7, [blocks[0]]+list(derivatives[n:n+j])+[blocks[1]]) | |
| # qc.barrier() | |
| W8 = Wj_block(1, n+1, "0"*(n+1), dt , -np.pi/2, xgate=False) | |
| qc.append(W8, list(derivatives[n:2*n])+list(blocks[:])) | |
| # qc.barrier() | |
| W9 = Wj_block(1, n+1, "0"+"1"*(n), -2*dt , -np.pi/2, xgate=False) | |
| qc.append(W9, list(derivatives[n:2*n])+list(blocks[:])) | |
| # qc.barrier() | |
| W10 = Wj_block(2, n, "0"+"1"*(n-1), -dt , -np.pi/2, xgate=False) | |
| qc.append(W10, list(derivatives[n+1:2*n]) + [blocks[0]] + [derivatives[n]] + [blocks[1]]) | |
| # qc.barrier() | |
| return qc | |
| def schro(nx, na, R, dt, initial_state, steps): | |
| nq = int(np.ceil(np.log2(nx))) | |
| # warped phase transformation | |
| dp = 2 * R * np.pi / 2**na | |
| p = np.arange(- R * np.pi, R * np.pi, step=dp) | |
| fp = np.exp(-np.abs(p)) | |
| norm1 = np.linalg.norm(fp[2**(na-1):]) # norm of p>=0 | |
| # construct quantum circuit | |
| system = QuantumRegister(2*nq+2, name='system') | |
| ancilla = QuantumRegister(na, name='ancilla') | |
| qc = QuantumCircuit(system, ancilla) | |
| # initialization | |
| prep = StatePreparation(initial_state) | |
| anc_prep = StatePreparation(fp / np.linalg.norm(fp)) | |
| qc.append(prep, system) | |
| qc.append(anc_prep, ancilla) | |
| # QFT | |
| qc.append(QFTGate(na), ancilla) | |
| qc.x(ancilla[-1]) | |
| A1 = V1(nx, dt, name = "V1").to_gate() | |
| A2 = V2(nx, dt, name = "V2").to_gate() | |
| # Hamiltonian simulation for Nt steps | |
| for i in range(steps): | |
| # circuit for one step | |
| for j in range(na): | |
| # repeat controlled H1 for 2**j times | |
| qc.append(A1.control().repeat(2**j), [ancilla[j]] + system[:]) | |
| # qc.append(A1.inverse().control(ctrl_state = "0").repeat(2**(na-1)), [ancilla[na-1]] + system[:]) | |
| qc.append(A1.inverse().repeat(2**(na-1)), system[:]) | |
| qc.append(A2, system[:]) | |
| # rearrange eta | |
| qc.x(ancilla[-1]) | |
| qc.append(QFTGate(na).inverse(), ancilla) | |
| return qc | |
| def circ_for_magnitude(field, x, y, nx, na, R, dt, initial_state, steps): | |
| qc = schro(nx, na, R, dt, initial_state, steps) | |
| naimark = QuantumRegister(1, name='Naimark') | |
| qc.add_register(naimark) | |
| if field == 'Ez': | |
| index = nx * y + x | |
| elif field == 'Hx': | |
| index = 2*nx*nx + nx * y + x | |
| else: | |
| index = 3*nx*nx + nx * y + x | |
| index_bin = format(index, f'0{qc.num_qubits-2}b') | |
| ctrl_state = '1' + index_bin | |
| ctrl_qubits = qc.qubits[:-1] | |
| qc.mcx(ctrl_qubits, naimark[0], ctrl_state=ctrl_state) | |
| return qc | |
| def circuits_for_sign(field, x, y, nx, na, dt, R, initial_state, steps, xref, yref, field_ref = 'Ez'): | |
| qc = schro(nx, na, R, dt, initial_state, steps) | |
| naimark = QuantumRegister(1, name='Naimark') | |
| qc.add_register(naimark) | |
| if field == 'Ez': | |
| index = nx * y + x | |
| elif field == 'Hx': | |
| index = 2*nx*nx + nx * y + x | |
| else: | |
| index = 3*nx*nx + nx * y + x | |
| if field_ref == 'Ez': | |
| index_ref = nx * yref + xref | |
| elif field_ref == 'Hx': | |
| index_ref = 2*nx*nx + nx * yref + xref | |
| else: | |
| index_ref = 3*nx*nx + nx * yref + xref | |
| index_bin = [(index >> i) & 1 for i in range(qc.num_qubits-2)] | |
| index_ref_bin = [(index_ref >> i) & 1 for i in range(qc.num_qubits-2)] | |
| index_bin.append(1) | |
| index_ref_bin.append(1) | |
| #Convert reference bitstring to 00000 | |
| for i, bit in enumerate(index_ref_bin): | |
| if bit == 1: | |
| qc.x(i) | |
| d_bits = [b ^ r for b, r in zip(index_ref_bin, index_bin)] | |
| control = d_bits.index(1) | |
| #Convert the other bitstring to 0001000 | |
| for target, bit in enumerate(d_bits): | |
| if bit == 1 and target != control: | |
| qc.cx(control, target) | |
| qc.h(control) | |
| ctrl_state_sum = '0'*(qc.num_qubits-1) | |
| ctrl_state_diff = '0'*(qc.num_qubits-1-control-1)+'1'+'0'*(control) | |
| qcdiff = qc.copy() | |
| ctrl_qubits = qc.qubits[:-1] | |
| qc.mcx(ctrl_qubits, naimark[0], ctrl_state=ctrl_state_sum) | |
| qcdiff.mcx(ctrl_qubits, naimark[0], ctrl_state=ctrl_state_diff) | |
| return qc, qcdiff | |
| def get_absolute_field_values(all_circuits, shots, pm_optimization_level, simulation = "True", platform = "IBM", progress_callback=None, print_callback=None): | |
| """ | |
| Execute circuits on quantum backend and return field values. | |
| Parameters | |
| ---------- | |
| progress_callback : callable, optional | |
| Function to report progress (40-90 range for Step 2 execution) | |
| print_callback : callable, optional | |
| Function for logging messages | |
| """ | |
| import time as time_module | |
| def _log(msg): | |
| if print_callback: | |
| print_callback(msg) | |
| def _progress(pct, message=None): | |
| if progress_callback: | |
| try: | |
| progress_callback(pct, message) | |
| except TypeError: | |
| # Old-style callback without message parameter | |
| try: | |
| progress_callback(pct) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| jobs = {} | |
| job_status = {key: "QUEUED" for key in jobs} | |
| total_circuits = len(all_circuits) | |
| _log(f"Step 2: Submitting {total_circuits} circuit(s) for execution...") | |
| _progress(40, f"Submitting {total_circuits} circuits...") | |
| if simulation=="True": | |
| backend = Aer.get_backend('qasm_simulator') | |
| _log(f"Using simulator backend: {backend.name()}") | |
| _progress(42, f"Backend: {backend.name()}") | |
| pm = generate_preset_pass_manager(backend=backend, optimization_level=pm_optimization_level) | |
| with Batch(backend=backend, max_time="2h") as batch: | |
| estimator = Estimator() | |
| for idx, (key, qc) in enumerate(all_circuits.items()): | |
| qc = transpile(qc, basis_gates=['u', 'cx', 'rz', 'sx', 'x']) | |
| pauli_label = 'Z'+'I'*(qc.num_qubits-1) | |
| qc_compiled = pm.run(qc) | |
| layout = qc_compiled.layout | |
| observable = SparsePauliOp(Pauli(pauli_label)).apply_layout(layout) | |
| job = estimator.run([(qc_compiled,observable)], precision=1/np.sqrt(shots)) | |
| jobs[key] = job | |
| # Progress: 42-50% for submission | |
| submit_pct = 42 + ((idx + 1) / total_circuits) * 8 | |
| _progress(submit_pct, f"Submitted circuit {idx+1}/{total_circuits}") | |
| elif simulation == "False" and platform == "IBM": | |
| _log("Connecting to IBM Quantum service...") | |
| _progress(42, "Connecting to IBM Quantum...") | |
| ibm_token = _require_env("API_KEY_IBM_EM", context="IBM EM QPU execution") | |
| service = QiskitRuntimeService( | |
| channel="ibm_cloud", | |
| token=ibm_token, | |
| instance="crn:v1:bluemix:public:quantum-computing:us-east:a/15157e4350c04a9dab51b8b8a4a93c86:e29afd91-64bf-4a82-8dbf-731e6c213595::", | |
| ) | |
| backend = service.least_busy(operational=True, simulator=False) | |
| _log(f"Selected IBM QPU backend: {backend.name}") | |
| _progress(45, f"Backend: {backend.name}") | |
| pm = generate_preset_pass_manager(backend=backend, optimization_level=pm_optimization_level) | |
| with Batch(backend=backend, max_time="2h") as batch: | |
| estimator = Estimator() | |
| estimator.options.resilience_level = 0 | |
| estimator.options.dynamical_decoupling.enable = True | |
| estimator.options.dynamical_decoupling.sequence_type = "XpXm" | |
| estimator.options.resilience.measure_mitigation = True | |
| estimator.options.resilience.zne_mitigation = True | |
| estimator.options.resilience.zne.noise_factors = (1.1, 1.3, 1.5) | |
| estimator.options.resilience.zne.extrapolator = ("exponential", "linear") | |
| for idx, (key, qc) in enumerate(all_circuits.items()): | |
| qc = transpile(qc, basis_gates=['u', 'cx', 'rz', 'sx', 'x']) | |
| pauli_label = 'Z'+'I'*(qc.num_qubits-1) | |
| qc_compiled = pm.run(qc) | |
| layout = qc_compiled.layout | |
| observable = SparsePauliOp(Pauli(pauli_label)).apply_layout(layout) | |
| job = estimator.run([(qc_compiled,observable)], precision=1/np.sqrt(shots)) | |
| jobs[key] = job | |
| # Progress: 45-55% for submission | |
| submit_pct = 45 + ((idx + 1) / total_circuits) * 10 | |
| _progress(submit_pct, f"Submitted circuit {idx+1}/{total_circuits}") | |
| elif simulation == "False" and platform == "IONQ": | |
| _log("Connecting to IonQ service...") | |
| _progress(42, "Connecting to IonQ...") | |
| ionq_token = _require_env("API_KEY_IONQ_EM", context="IonQ EM QPU execution") | |
| os.environ.setdefault("IONQ_API_TOKEN", ionq_token) | |
| provider = IonQProvider() | |
| ionq_backend = provider.get_backend("qpu.forte-enterprise-1") | |
| _log(f"Selected IonQ backend: {ionq_backend.name}") | |
| _progress(45, f"Backend: {ionq_backend.name}") | |
| pm = generate_preset_pass_manager(backend=ionq_backend, optimization_level=1) | |
| with Batch(backend=ionq_backend, max_time="2h") as batch: | |
| estimator = Estimator() | |
| for idx, (key, qc) in enumerate(all_circuits.items()): | |
| pauli_label = 'Z'+'I'*(qc.num_qubits-1) | |
| qc_compiled = transpile(qc,basis_gates=['ry','rz','rx','h','cx']) | |
| layout = qc_compiled.layout | |
| observable = SparsePauliOp(Pauli(pauli_label)).apply_layout(layout) | |
| job = estimator.run([(qc_compiled,observable)], precision=1/np.sqrt(shots)) | |
| jobs[key] = job | |
| submit_pct = 45 + ((idx + 1) / total_circuits) * 10 | |
| _progress(submit_pct, f"Submitted circuit {idx+1}/{total_circuits}") | |
| # Poll for job completion with status updates (55-85%) | |
| _log("Jobs submitted. Waiting for results...") | |
| _progress(55, "Jobs queued, waiting for execution...") | |
| total_jobs = len(jobs) | |
| completed_jobs = 0 | |
| job_keys = list(jobs.keys()) | |
| pending_keys = set(job_keys) | |
| poll_count = 0 | |
| max_polls = 600 # ~10 minutes with 1s interval | |
| while pending_keys and poll_count < max_polls: | |
| for key in list(pending_keys): | |
| try: | |
| job = jobs[key] | |
| status = job.status() | |
| status_name = status.name if hasattr(status, 'name') else str(status) | |
| if status_name != job_status.get(key): | |
| job_status[key] = status_name | |
| _log(f"Job {key[:30]}... Status: {status_name}") | |
| if status_name in ('DONE', 'ERROR', 'CANCELLED'): | |
| pending_keys.discard(key) | |
| completed_jobs += 1 | |
| # Progress: 55-85% based on completion | |
| completion_pct = 55 + (completed_jobs / total_jobs) * 30 | |
| _progress(completion_pct, f"Completed {completed_jobs}/{total_jobs} jobs") | |
| except Exception as e: | |
| pass | |
| if pending_keys: | |
| # Show indeterminate progress while waiting | |
| queued_count = sum(1 for s in job_status.values() if s in ('QUEUED', 'VALIDATING', 'INITIALIZING')) | |
| running_count = sum(1 for s in job_status.values() if s == 'RUNNING') | |
| if running_count > 0: | |
| # Slowly increment while running (55-85 range) | |
| run_progress = 55 + min(30, poll_count * 0.1) | |
| _progress(run_progress, f"Running... ({running_count} active, {queued_count} queued)") | |
| elif queued_count > 0: | |
| queue_progress = 55 + min(10, poll_count * 0.05) | |
| _progress(queue_progress, f"Queued... (waiting {poll_count}s)") | |
| time_module.sleep(1) | |
| poll_count += 1 | |
| _log("All jobs completed. Retrieving results...") | |
| _progress(87, "Retrieving results...") | |
| ######################################################################################################### | |
| results = {} | |
| for idx, (key, job) in enumerate(jobs.items()): | |
| res = job.result()[0] | |
| z_exp = res.data.evs.item() # expectation value | |
| results[key] = np.sqrt((1 - z_exp) / 2) | |
| # Progress: 87-90% for result retrieval | |
| result_pct = 87 + ((idx + 1) / total_jobs) * 3 | |
| _progress(result_pct, f"Retrieved result {idx+1}/{total_jobs}") | |
| _progress(90, "All results retrieved") | |
| ######################################################################################### | |
| return results | |
| def build_optimized_circuits(nx, impulse_pos, field, grid_point, time_values, optimization="True", progress_callback=None): | |
| na = 1 | |
| dt = 0.1 | |
| R = 4 | |
| xref, yref = impulse_pos | |
| x, y = grid_point | |
| grid_dims = (nx, nx) | |
| initial_state = create_impulse_state(grid_dims, impulse_pos) | |
| dp = 2 * R * np.pi / 2**na | |
| p = np.arange(-R * np.pi, R * np.pi, step=dp) | |
| fp = np.exp(-np.abs(p)) | |
| norm_anc = np.linalg.norm(fp[2**(na - 1):]) # norm of p>=0 | |
| all_circuits = {} | |
| norm_offset_ref = {} | |
| MAX_THREADS = 20 # soft upper bound | |
| MEMORY_THRESHOLD = 75 # maximum % RAM to allow for parallel tasks | |
| futures = {} | |
| def submit_task_safe(executor, func, *args): | |
| """Submit task only if memory usage is below threshold.""" | |
| while psutil.virtual_memory().percent > MEMORY_THRESHOLD: | |
| time.sleep(0.5) # wait until memory drops | |
| return executor.submit(func, *args) | |
| with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: | |
| total_steps = len(time_values) | |
| for idx, time_value in enumerate(time_values): | |
| steps = int(math.ceil(time_value / dt)) | |
| offset_ref = 0 if steps < 9 else 0.31 | |
| deltastate = np.zeros(4 * nx * nx) | |
| deltastate[nx * yref + xref] = 1 | |
| deltastate[0:nx * nx] += offset_ref | |
| norm_ref_state = np.linalg.norm(deltastate) | |
| initial_state_ref = deltastate / norm_ref_state | |
| key_norm_offset_ref = f"norm_offset_ref_time{time_value}" | |
| norm_offset_ref[key_norm_offset_ref] = (norm_ref_state, offset_ref) | |
| # Build reference circuit | |
| qc_ref = circ_for_magnitude("Ez", xref, yref, nx, na, R, dt, initial_state_ref, steps) | |
| key_ref = f"fieldEz_ref_time{time_value}" | |
| if not (field == "Ez" and (x, y) == (xref, yref)): | |
| circ_magnitude = circ_for_magnitude(field, x, y, nx, na, R, dt, initial_state, steps) | |
| circsum, circdiff = circuits_for_sign(field, x, y, nx, na, dt, R,initial_state, steps, xref, yref, field_ref='Ez') | |
| if optimization == "True": | |
| # Submit all 4 optimization tasks into the global pool | |
| futures[submit_task_safe(executor, generate_optimized_circuit, qc_ref)] = key_ref | |
| futures[submit_task_safe(executor, generate_optimized_circuit, circ_magnitude)] = ( | |
| f"field{field}_magnitude_x{x}_y{y}_time{time_value}" | |
| ) | |
| futures[submit_task_safe(executor, generate_optimized_circuit, circsum)] = ( | |
| f"field{field}_sum_x{x}_y{y}_time{time_value}" | |
| ) | |
| futures[submit_task_safe(executor, generate_optimized_circuit, circdiff)] = ( | |
| f"field{field}_diff_x{x}_y{y}_time{time_value}" | |
| ) | |
| else: | |
| # No optimization → store directly | |
| all_circuits[key_ref] = qc_ref | |
| all_circuits[f"field{field}_magnitude_x{x}_y{y}_time{time_value}"] = circ_magnitude | |
| all_circuits[f"field{field}_sum_x{x}_y{y}_time{time_value}"] = circsum | |
| all_circuits[f"field{field}_diff_x{x}_y{y}_time{time_value}"] = circdiff | |
| else: | |
| # Reference point | |
| if optimization == "True": | |
| futures[submit_task_safe(executor, generate_optimized_circuit, qc_ref)] = key_ref | |
| else: | |
| all_circuits[key_ref] = qc_ref | |
| # If NOT optimizing, report progress here (0-40%) | |
| if optimization != "True" and progress_callback: | |
| pct = ((idx + 1) / total_steps) * 40.0 | |
| try: | |
| progress_callback(pct) | |
| except Exception: | |
| pass | |
| # Collect results as they finish | |
| total_futures = len(futures) | |
| completed_count = 0 | |
| if total_futures > 0: | |
| for future in as_completed(futures): | |
| key = futures[future] | |
| all_circuits[key] = future.result() | |
| completed_count += 1 | |
| if progress_callback: | |
| # Map 0-100% of this task to 0-40% of total progress | |
| pct = (completed_count / total_futures) * 40.0 | |
| try: | |
| progress_callback(pct) | |
| except Exception: | |
| pass | |
| elif progress_callback: | |
| # If no futures (no optimization), we are done with this step instantly | |
| try: | |
| progress_callback(40.0) | |
| except Exception: | |
| pass | |
| return all_circuits, norm_offset_ref, norm_anc | |
| def create_time_frames(total_time, snapshot_interval): | |
| """ | |
| Create a list of time frames for simulation snapshots. | |
| Matches the logic from delta_impulse_generator.py run_sve(). | |
| """ | |
| dt = 0.1 | |
| tol = 1e-9 | |
| try: | |
| T_val = float(total_time) | |
| except (ValueError, TypeError): | |
| return [] | |
| if T_val <= 0: | |
| return [] | |
| steps = int(np.floor(T_val / dt)) | |
| if steps <= 0: | |
| return [0.0] | |
| T_eff = steps * dt | |
| try: | |
| snapshot_dt_val = float(snapshot_interval) | |
| except (ValueError, TypeError): | |
| snapshot_dt_val = dt | |
| if snapshot_dt_val < dt: | |
| snapshot_dt_val = dt | |
| k = max(1, int(round(snapshot_dt_val / dt))) | |
| snapshot_dt_eff = k * dt | |
| times = np.arange(0, T_eff + tol, snapshot_dt_eff) | |
| if abs(times[-1] - T_eff) > tol: | |
| times = np.append(times, T_eff) | |
| times = np.round(times, 12) | |
| unique_times = [] | |
| for t in times: | |
| if not unique_times or abs(t - unique_times[-1]) > tol: | |
| unique_times.append(float(t)) | |
| return unique_times | |
| def get_field_values(field, x, y, T, snapshot_time, nx, impulse_pos, shots, pm_optimization_level, simulation="True", optimization="True", platform="IBM", progress_callback=None, print_callback=None): | |
| """ | |
| Run quantum simulation for a single field component at a single position | |
| and return time-series field values. | |
| This function is designed for IBM QPU with the restriction that only ONE field | |
| component and ONE position coordinate can be processed at a time. | |
| Parameters | |
| ---------- | |
| field : str | |
| Field component to measure: 'Ez', 'Hx', or 'Hy'. | |
| x : int | |
| X grid index of the monitor position. | |
| ** IBM QPU restriction: Only ONE position allowed! ** | |
| y : int | |
| Y grid index of the monitor position. | |
| T : float | |
| Total simulation time. | |
| snapshot_time : float | |
| Time interval between snapshots. | |
| nx : int | |
| Grid dimension (grid is nx x nx). | |
| impulse_pos : tuple | |
| (x, y) integer grid indices of the initial impulse. | |
| shots : int | |
| Number of shots for QPU execution. | |
| pm_optimization_level : int | |
| Pass manager optimization level (0-3). | |
| simulation : str | |
| "True" for simulator, "False" for real QPU. | |
| optimization : str | |
| "True" to use ADAPT-AQC optimization, "False" otherwise. | |
| platform : str | |
| "IBM" or "IONQ". | |
| progress_callback : callable, optional | |
| Function to report progress (0-100). | |
| print_callback : callable, optional | |
| Function for logging messages. | |
| Returns | |
| ------- | |
| field_values : list[float] | |
| Time-series of field values at the specified position. | |
| """ | |
| def _log(msg): | |
| if print_callback: | |
| print_callback(msg) | |
| else: | |
| print(msg) | |
| xref, yref = impulse_pos | |
| grid_point = (int(x), int(y)) | |
| # Create time frames from T and snapshot_time (matching run_sve logic) | |
| time_values = create_time_frames(T, snapshot_time) | |
| total_frames = len(time_values) | |
| if total_frames == 0: | |
| _log("No valid time frames generated.") | |
| return [] | |
| _log(f"Starting QPU simulation: T={T}s, frames={total_frames}, platform={platform}") | |
| _log(f"Time frames: {time_values}") | |
| _log(f"Impulse position (grid): {impulse_pos}") | |
| _log(f"Monitor position (grid): {grid_point}") | |
| _log(f"Field component: {field}") | |
| # --- Step 1: Circuit Construction & Optimization (0-40%) --- | |
| _log("Step 1: Circuit Construction & Optimization") | |
| if progress_callback: | |
| try: | |
| progress_callback(0.0) | |
| except Exception: | |
| pass | |
| # Build circuits using existing logic | |
| # Pass progress_callback to track optimization progress (0-40%) | |
| all_circuits, norm_offset_ref, norm_anc = build_optimized_circuits(nx, impulse_pos, field, grid_point, time_values, optimization, progress_callback) | |
| # --- Step 2: Circuit Execution (40-90%) --- | |
| _log("Step 2: Circuit Execution") | |
| if progress_callback: | |
| try: | |
| progress_callback(40.0) | |
| except Exception: | |
| pass | |
| results = get_absolute_field_values(all_circuits, shots, pm_optimization_level, simulation, platform, progress_callback=progress_callback, print_callback=print_callback) | |
| # --- Step 3: Result Processing (90-100%) --- | |
| _log("Step 3: Result Processing") | |
| if progress_callback: | |
| try: | |
| progress_callback(90.0) | |
| except Exception: | |
| pass | |
| # Process results using existing logic | |
| Field_values = [] | |
| for idx, time_value in enumerate(time_values): | |
| key_ref = f"fieldEz_ref_time{time_value}" | |
| key_norm_offset_ref = f"norm_offset_ref_time{time_value}" | |
| Ezref = norm_offset_ref[key_norm_offset_ref][0]*norm_anc*results[key_ref]-norm_offset_ref[key_norm_offset_ref][1] | |
| if field == "Ez" and grid_point == (xref, yref): | |
| Field_values.append(Ezref) | |
| else: | |
| key_magnitude = f"field{field}_magnitude_x{grid_point[0]}_y{grid_point[1]}_time{time_value}" | |
| key_sum = f"field{field}_sum_x{grid_point[0]}_y{grid_point[1]}_time{time_value}" | |
| key_diff = f"field{field}_diff_x{grid_point[0]}_y{grid_point[1]}_time{time_value}" | |
| magnitude = norm_anc*results[key_magnitude] | |
| magnitude_sum = norm_anc*results[key_sum] | |
| magnitude_diff = norm_anc*results[key_diff] | |
| if (magnitude_sum >= magnitude_diff and Ezref >= 0) or (magnitude_sum < magnitude_diff and Ezref < 0): | |
| Field_values.append(magnitude) | |
| else: | |
| Field_values.append(-magnitude) | |
| if progress_callback: | |
| try: | |
| # Map remaining 10% (90-100%) to result processing | |
| progress = 90.0 + ((idx + 1) / total_frames) * 10.0 | |
| progress_callback(progress) | |
| except Exception: | |
| pass | |
| _log("QPU simulation completed.") | |
| return Field_values | |
| def transpile_circ(circ, basis_gates=None): | |
| """ | |
| Transpile the circuit to the specified basis gates. | |
| """ | |
| if basis_gates is None: | |
| basis_gates = ['z', 'y', 'x', 'sdg', 's', 'h', 'rz', 'ry', 'rx', 'ecr', 'cz', 'cx'] | |
| transpiled_circ = transpile(circ, basis_gates=basis_gates) | |
| return transpiled_circ | |
| def create_impulse_state(grid_dims, impulse_pos): | |
| grid_width, grid_height = grid_dims | |
| impulse_x, impulse_y = impulse_pos | |
| # --- Input Validation --- | |
| # Ensure the requested impulse position is actually on the grid. | |
| if not (0 <= impulse_x < grid_width and 0 <= impulse_y < grid_height): | |
| raise ValueError(f"Impulse position ({impulse_x}, {impulse_y}) is outside the " | |
| f"grid dimensions ({grid_width}x{grid_height}).") | |
| # --- 1. Calculate the 1D Array Index --- | |
| # Convert the (x, y) coordinate to a single index in a flattened 1D array. | |
| # The formula for row-major order is: index = y_coord * width + x_coord | |
| flat_index = impulse_y * grid_width + impulse_x | |
| # --- 2. Create the Full, Padded State Vector --- | |
| grid_size = grid_width * grid_height | |
| total_size = 4 * grid_size # The simulation space is 4x the grid size. | |
| initial_state = np.zeros(total_size) | |
| # --- 3. Set the Delta Impulse --- | |
| initial_state[flat_index] = 1 | |
| return initial_state | |
| ############################################################################################################################################### | |
| #ADAPT_AQC optimization part | |
| import tempfile | |
| import subprocess | |
| import sys | |
| import os | |
| from pathlib import Path | |
| from qiskit import QuantumCircuit | |
| from qiskit.qasm3 import dumps, loads | |
| # --- Dynamic Path Detection --- | |
| # This file is at: .../quantum/utils/EBU_Quantum/no_body/base_functions.py | |
| # In Docker: /home/user/app/utils/EBU_Quantum/no_body/base_functions.py | |
| # We need to find the 'utils' directory which is 3 levels up from this file | |
| CURRENT_FILE = Path(__file__).resolve() | |
| # Go up to the 'utils' directory (3 levels up from base_functions.py) | |
| # base_functions.py -> no_body -> EBU_Quantum -> utils | |
| UTILS_DIR = CURRENT_FILE.parents[2] | |
| # The aqc_venv is directly inside utils/ | |
| VENV_DIR = UTILS_DIR / "aqc_venv" | |
| # PROJECT_ROOT is the parent of 'utils' (needed for module imports in subprocess) | |
| # In Docker: /home/user/app | |
| # Locally: .../quantum (or .../webui_trame/quantum) | |
| PROJECT_ROOT = UTILS_DIR.parent | |
| if sys.platform == "win32": | |
| ADAPT_PYTHON = str(VENV_DIR / "Scripts" / "python.exe") | |
| else: | |
| ADAPT_PYTHON = str(VENV_DIR / "bin" / "python") | |
| # 3. Define the module path - this should work from PROJECT_ROOT | |
| ADAPT_MODULE = "utils.EBU_Quantum.no_body.run_adapt_aqc" | |
| def generate_optimized_circuit(qc: QuantumCircuit) -> QuantumCircuit: | |
| """ | |
| Run ADAPT-AQC optimization fully in memory using QASM 3 strings, | |
| safely encoded through stdin/stdout. | |
| """ | |
| # Serialize circuit to QASM 3 | |
| qasm_str = dumps(qc) | |
| # Inline script for the adapt-aqc environment. | |
| # Reads QASM3 from stdin and writes QASM3 to stdout. | |
| # We dynamically add the research library paths to sys.path inside the subprocess. | |
| code = f""" | |
| import sys | |
| import os | |
| # Add paths to the research libraries | |
| # CWD will be PROJECT_ROOT (e.g., /home/user/app in Docker) | |
| sys.path.append(os.path.join(os.getcwd(), 'utils', 'aqc-research')) | |
| sys.path.append(os.path.join(os.getcwd(), 'utils', 'adapt-aqc')) | |
| from qiskit.qasm3 import loads, dumps | |
| # Now we can import the module | |
| from {ADAPT_MODULE} import run_adapt_aqc | |
| # Read entire stdin safely | |
| data = sys.stdin.read() | |
| # Convert QASM3 string back to circuit | |
| qc = loads(data) | |
| # Optimize | |
| qc_opt = run_adapt_aqc(qc) | |
| # Print optimized circuit as QASM3 | |
| sys.stdout.write(dumps(qc_opt)) | |
| """ | |
| # Launch subprocess, pipe in QASM3 string | |
| # We set cwd to PROJECT_ROOT so imports like 'utils...' work correctly | |
| proc = subprocess.run( | |
| [ADAPT_PYTHON, "-c", code], | |
| input=qasm_str, | |
| text=True, | |
| capture_output=True, | |
| cwd=str(PROJECT_ROOT), | |
| ) | |
| if proc.returncode != 0: | |
| print("STDOUT:\n", proc.stdout) | |
| print("STDERR:\n", proc.stderr) | |
| raise RuntimeError(f"Adapt-AQC failed with exit code {proc.returncode}") | |
| # Parse optimized QASM 3 back into circuit | |
| try: | |
| qc_opt = loads(proc.stdout) | |
| except Exception as e: | |
| print("⚠️ QASM3 parse error on return. Output was:") | |
| print(proc.stdout[:500]) | |
| raise e | |
| return qc_opt |