from qiskit import QuantumCircuit,QuantumRegister,ClassicalRegister,transpile from qiskit.synthesis.qft import synth_qft_full as QFT import numpy as np import os from sympy import sympify, symbols, lambdify from qiskit_ibm_runtime import QiskitRuntimeService import plotly.graph_objects as go dim=3 QLBM_PLOT_COLORSCALE = "Turbo" def bin_to_gray(bin_s): XOR=lambda x,y: (x or y) and not (x and y) gray_s=bin_s[0] for i in range(len(bin_s)-1): c_bool=XOR(bool(int(bin_s[i])),bool(int(bin_s[i+1]))) gray_s+=str(int(c_bool)) return gray_s def gray_to_bin(gray_s): XOR=lambda x,y: (x or y) and not (x and y) bin_s=gray_s[0] for i in range(len(gray_s)-1): c_bool=XOR(bool(int(bin_s[i])),bool(int(gray_s[i+1]))) bin_s+=str(int(c_bool)) return bin_s def bin_to_int(bin_s): return int(bin_s,2) def int_to_bin(i,pad): return bin(i)[2:].zfill(pad) def fwht_approx(f,N,num_points_per_dim,threshold=1e-10): linear_block_size=int(N//num_points_per_dim) num_angles_per_block=int(np.log2(linear_block_size)) thetas={} for k in range(num_points_per_dim): for j in range(num_points_per_dim): for i in range(num_points_per_dim): avg_f=2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size+(linear_block_size-1)/2)) thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size]=avg_f slope_x=(2*np.arccos(f(i*linear_block_size,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size+(linear_block_size-1)/2))-2*np.arccos(f(((i+1)%N)*linear_block_size,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size+(linear_block_size-1)/2)))/linear_block_size slope_y=(2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size,k*linear_block_size+(linear_block_size-1)/2))-2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,((j+1)%N)*linear_block_size,k*linear_block_size+(linear_block_size-1)/2)))/linear_block_size slope_z=(2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size))-2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size+(linear_block_size-1)/2,((k+1)%N)*linear_block_size)))/linear_block_size for m in range(num_angles_per_block): thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size + 2**m]=slope_x*(2**(m-1)) thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size + N*(2**m)]=slope_y*(2**(m-1)) thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size + (N**2)*(2**m)]=slope_z*(2**(m-1)) h = linear_block_size while h < N**3: for i in range(0, N**3, h * 2): if (i//N)%linear_block_size!=0: continue if (i//(N**2))%linear_block_size!=0: continue j=i while jthreshold: threshold=sum_-th break return [theta for theta in thetas.values() if abs(theta)>threshold],[key for key in thetas.keys() if abs(thetas[key])>threshold] def get_circuit_inputs(f,num_reg_qubits,num_points_per_dim): theta_vec,indices=fwht_approx(f,2**num_reg_qubits,num_points_per_dim,1e-4) circ_pos=[] for ind in indices: circ_pos+=[bin_to_int(gray_to_bin(int_to_bin(ind,num_reg_qubits*3)))] sorted_theta_vec=sorted(zip(theta_vec,circ_pos),key=lambda el:el[1]) ctrls=[] current_bs="0"*(3*num_reg_qubits) for el in sorted_theta_vec: new_bs=bin_to_gray(int_to_bin((el[1])%(2**(3*num_reg_qubits)),(3*num_reg_qubits))) ctrls += [[i for i, (char1, char2) in enumerate(zip(current_bs[::-1], new_bs[::-1])) if char1 != char2]] current_bs=new_bs new_bs="0"*(3*num_reg_qubits) ctrls += [[i for i, (char1, char2) in enumerate(zip(current_bs[::-1], new_bs[::-1])) if char1 != char2]] return [el[0] for el in sorted_theta_vec],ctrls def get_coeffs(n,ux,uy,uz,resolution=32): current_N=2**n x_coeffs,x_coeff_var_indices=get_circuit_inputs(lambda x,y,z: ((1+ux(x/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution)) y_coeffs,y_coeff_var_indices=get_circuit_inputs(lambda x,y,z: ((1+uy(x/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution)) z_coeffs,z_coeff_var_indices=get_circuit_inputs(lambda x,y,z: ((1+uz(x/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution)) x_coeffs_,x_coeff_var_indices_=get_circuit_inputs(lambda x,y,z: 0 if (1+ux((x-1)/current_N,y/current_N,z/current_N))==0 else \ ((1+ux((x-1)/current_N,y/current_N,z/current_N))/(2+ux((x-1)/current_N,y/current_N,z/current_N)-ux((x+1)/current_N,y/current_N,z/current_N)))**0.5,n,min(current_N,resolution)) y_coeffs_,y_coeff_var_indices_=get_circuit_inputs(lambda x,y,z: 0 if (1+uy(x/current_N,(y-1)/current_N,z/current_N))==0 else \ ((1+uy(x/current_N,(y-1)/current_N,z/current_N))/(2+uy(x/current_N,(y-1)/current_N,z/current_N)-uy(x/current_N,(y+1)/current_N,z/current_N)))**0.5,n,min(current_N,resolution)) z_coeffs_,z_coeff_var_indices_=get_circuit_inputs(lambda x,y,z: 0 if (1+uz(x/current_N,y/current_N,(z-1)/current_N))==0 else \ ((1+uz(x/current_N,y/current_N,(z-1)/current_N))/(2+uz(x/current_N,y/current_N,(z-1)/current_N)-uz(x/current_N,y/current_N,(z+1)/current_N)))**0.5,n,min(current_N,resolution)) unprep1_coeffs,unprep1_coeff_var_indices=get_circuit_inputs(lambda x,y,z:\ (1/3**0.5)*(1+(ux((x-1)/current_N,y/current_N,z/current_N)-ux((x+1)/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution)) unprep2_coeffs,unprep2_coeff_var_indices=get_circuit_inputs(lambda x,y,z:\ ((1+(uy(x/current_N,(y-1)/current_N,z/current_N)-uy(x/current_N,(y+1)/current_N,z/current_N))/2)/(2-(ux((x-1)/current_N,y/current_N,z/current_N)-ux((x+1)/current_N,y/current_N,z/current_N))/2))**0.5,n,min(current_N,resolution)) return x_coeffs,x_coeff_var_indices, y_coeffs,y_coeff_var_indices, z_coeffs,z_coeff_var_indices,\ x_coeffs_,x_coeff_var_indices_, y_coeffs_,y_coeff_var_indices_, z_coeffs_,z_coeff_var_indices_,\ unprep1_coeffs,unprep1_coeff_var_indices, unprep2_coeffs,unprep2_coeff_var_indices def get_coll_ops(n,ux,uy,uz,resolution=32): x_coeffs,x_coeff_var_indices, y_coeffs,y_coeff_var_indices, z_coeffs,z_coeff_var_indices,\ x_coeffs_,x_coeff_var_indices_, y_coeffs_,y_coeff_var_indices_, z_coeffs_,z_coeff_var_indices_,\ unprep1_coeffs,unprep1_coeff_var_indices, unprep2_coeffs,unprep2_coeff_var_indices = get_coeffs(n,ux,uy,uz,resolution) def prep(qc,pos_qr,dir_qr): qc.h(dir_qr[0]) qc.h(dir_qr[4]) qc.cx(dir_qr[0],dir_qr[2]) qc.ry(-np.pi/4,dir_qr[4]) qc.cx(dir_qr[2],dir_qr[4]) qc.ry(np.pi/4,dir_qr[4]) qc.cx(dir_qr[2],dir_qr[4]) qc.ry(-np.pi/4,dir_qr[2]) qc.cx(dir_qr[0],dir_qr[2]) qc.ry(np.pi/4,dir_qr[2]) qc.cx(dir_qr[0],dir_qr[2]) qc.cx(dir_qr[2],dir_qr[0]) qc.cx(dir_qr[0],dir_qr[1]) for i in range(len(x_coeff_var_indices)): for ind in x_coeff_var_indices[i]: qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[0]) if i=0 else str(-el)+"0" for el in dir_[::-1]]) for dir_ in dirs] dirs_state=np.zeros(2**7) for i,dir_ind in enumerate(dir_indices): ind=int(dir_ind,2) dirs_state[ind]=wts[i]**0.5 qc_list=[] prep, unprep=get_coll_ops(n,ux,uy,uz,vel_resolution) for T_total in T_list: pos_qr=[QuantumRegister(n) for _ in range(dim)] pos_cr=[ClassicalRegister(n) for _ in range(dim)] if midcircuit_meas: dir_qr=QuantumRegister(2*dim) else: dir_qr_list=[QuantumRegister(2*dim) for _ in range(T_total)] dir_qr_flag=QuantumRegister(2*dim) dir_cr=[ClassicalRegister((4 if flag_qubits and midcircuit_meas else 2)*dim) for _ in range(T_total+int(flag_qubits and not midcircuit_meas))] if flag_qubits: if midcircuit_meas: qc=QuantumCircuit(*pos_qr,dir_qr,dir_qr_flag,*pos_cr,*dir_cr) else: qc=QuantumCircuit(*pos_qr,*dir_qr_list,dir_qr_flag,*pos_cr,*dir_cr) else: if midcircuit_meas: qc=QuantumCircuit(*pos_qr,dir_qr,*pos_cr,*dir_cr) else: qc=QuantumCircuit(*pos_qr,*dir_qr_list,*pos_cr,*dir_cr) qc.compose(init_state_prep_circ,[qubit for qr in pos_qr for qubit in list(qr)], inplace=True) uniform_bool=False if ux_str is not None: if 'x' not in ux_str+uy_str+uz_str and 'y' not in ux_str+uy_str+uz_str and 'z' not in ux_str+uy_str+uz_str: uniform_bool=True if uniform_bool: for i in range(dim): qc.compose(QFT(n, inverse=False, do_swaps=False), pos_qr[i], inplace=True) for T in list(range(T_total))[::-1]: if not midcircuit_meas: dir_qr=dir_qr_list[T] prep(qc,pos_qr,dir_qr) if flag_qubits: for q1,q2 in zip(dir_qr,dir_qr_flag): qc.cx(q1,q2) if not uniform_bool: for i in range(dim): qc.compose(QFT(n, inverse=False, do_swaps=False), pos_qr[i], inplace=True) stream(qc,pos_qr,dir_qr,n) if not uniform_bool: for i in range(dim): qc.compose(QFT(n, inverse=True, do_swaps=False), pos_qr[i], inplace=True) if flag_qubits: for q1,q2 in zip(dir_qr,dir_qr_flag): qc.cx(q1,q2) unprep(qc,pos_qr,dir_qr) if flag_qubits and midcircuit_meas: qc.measure(list(dir_qr)+list(dir_qr_flag),dir_cr[T]) else: qc.measure(dir_qr,dir_cr[T]) if not midcircuit_meas and flag_qubits: qc.measure(dir_qr_flag,dir_cr[T_total]) if uniform_bool: for i in range(dim): qc.compose(QFT(n, inverse=True, do_swaps=False), pos_qr[i], inplace=True) if measure: for i in range(dim): qc.measure(pos_qr[i],pos_cr[i]) qc_list+=[qc] return qc_list def str_to_lambda(vx_param,vy_param,vz_param): vx_val = str(vx_param) vy_val = str(vy_param) vz_val = str(vz_param) x_sym, y_sym, z_sym = symbols('x y z') vx_sympified = sympify(vx_val) vy_sympified = sympify(vy_val) vz_sympified = sympify(vz_val) vx=lambdify((x_sym, y_sym, z_sym), vx_sympified, modules="numpy") vy=lambdify((x_sym, y_sym, z_sym), vy_sympified, modules="numpy") vz=lambdify((x_sym, y_sym, z_sym), vz_sympified, modules="numpy") return vx,vy,vz def get_named_init_state_circuit( n: int, init_state_name: str, # Sinusoidal parameters (frequency multipliers) sine_k_x: float = 1.0, sine_k_y: float = 1.0, sine_k_z: float = 1.0, # Gaussian parameters gauss_cx: float = None, # Center X (0-1 normalized), defaults to 0.5 gauss_cy: float = None, # Center Y (0-1 normalized), defaults to 0.5 gauss_cz: float = None, # Center Z (0-1 normalized), defaults to 0.5 gauss_sigma: float = None, # Spread, defaults to 0.2 in normalized units # Multi-dirac-delta parameters mdd_kx_log2: int = 1, # Integer greater than >=1. Number of dirac-deltas along x is 2^mdd_kx_log2 mdd_ky_log2: int = 1, # Integer greater than >=1. Number of dirac-deltas along y is 2^mdd_ky_log2 mdd_kz_log2: int = 1 # Integer greater than >=1. Number of dirac-deltas along z is 2^mdd_kz_log2 ): """ Create initial state preparation circuit with configurable parameters. Parameters ---------- n : int Number of qubits per spatial dimension (grid size = 2^n per axis) init_state_name : str One of "dirac_delta", "sin", "gaussian" sine_k_x, sine_k_y, sine_k_z : float Frequency multipliers for sinusoidal distribution (default=1.0) gauss_cx, gauss_cy, gauss_cz : float Center coordinates in [0,1] for Gaussian (default=0.5) gauss_sigma : float Spread of Gaussian in normalized units (default=0.2) mdd_kx_log2, mdd_ky_log2, mdd_kz_log2 : int log2 of frequency multipliers for dirac-delta array distribution (default=1) Returns ------- QuantumCircuit State preparation circuit """ N = 2**n init_state_prep_circ = QuantumCircuit(3*n) if init_state_name == "dirac_delta": init_state_prep_circ.x(n-1) init_state_prep_circ.x(2*n-1) init_state_prep_circ.x(3*n-1) elif init_state_name == "multi_dirac_delta": init_state_prep_circ.h(range(n-mdd_kx_log2,n)) init_state_prep_circ.x(n-1-mdd_kx_log2) init_state_prep_circ.h(range(2*n-mdd_ky_log2,2*n)) init_state_prep_circ.x(2*n-1-mdd_ky_log2) init_state_prep_circ.h(range(3*n-mdd_kz_log2,3*n)) init_state_prep_circ.x(3*n-1-mdd_kz_log2) elif init_state_name == "sin": # Configurable frequency sinusoidal distribution # f(x,y,z) = 1 + sin(2π * kx * x) * sin(2π * ky * y) * sin(2π * kz * z) kx = max(1, int(round(float(sine_k_x)))) ky = max(1, int(round(float(sine_k_y)))) kz = max(1, int(round(float(sine_k_z)))) coords = np.arange(N) / N # Normalized [0, 1) sin_x = np.sin(2 * np.pi * kx * coords) sin_y = np.sin(2 * np.pi * ky * coords) sin_z = np.sin(2 * np.pi * kz * coords) # Build 3D state via Kronecker products # Order matches original: z ⊗ (y ⊗ x) init_state = 1 + ( np.kron(sin_z, np.ones(N**2)) * np.kron(np.ones(N**2), sin_x) * np.kron(np.ones(N), np.kron(sin_y, np.ones(N))) ) init_state_prep_circ.prepare_state(init_state.astype(np.complex128), normalize=True) init_state_prep_circ = transpile(init_state_prep_circ, basis_gates=['u1', 'u2', 'u3', 'cx']) elif init_state_name == "gaussian": # Configurable Gaussian distribution # f(x,y,z) = exp(-((x-cx)^2 + (y-cy)^2 + (z-cz)^2) / (2*sigma^2)) # Default centers to 0.5 (middle of domain) - matches original mu=0.5 cx = float(gauss_cx) if gauss_cx is not None else 0.5 cy = float(gauss_cy) if gauss_cy is not None else 0.5 cz = float(gauss_cz) if gauss_cz is not None else 0.5 # Default sigma to 1.0 to match original sig=1 behavior # Original formula: exp(-((x - mu) / sig)^2 / 2) with sig=1 # Our formula: exp(-((x - cx)^2) / (2 * sigma^2)) # For equivalence: sigma = 1.0 (they are the same formula) sigma = float(gauss_sigma) if gauss_sigma is not None else 1.0 coords = np.arange(N) / N # Normalized [0, 1) gauss_x = np.exp(-((coords - cx)**2) / (2 * sigma**2)) gauss_y = np.exp(-((coords - cy)**2) / (2 * sigma**2)) gauss_z = np.exp(-((coords - cz)**2) / (2 * sigma**2)) # Build 3D state via Kronecker products (same order as original) init_state = ( np.kron(gauss_z, np.ones(N**2)) * np.kron(np.ones(N**2), gauss_x) * np.kron(np.ones(N), np.kron(gauss_y, np.ones(N))) ) init_state_prep_circ.prepare_state(init_state.astype(np.complex128), normalize=True) init_state_prep_circ = transpile(init_state_prep_circ, basis_gates=['u1', 'u2', 'u3', 'cx']) return init_state_prep_circ ########################################################################################## from qiskit_ibm_runtime import QiskitRuntimeService, SamplerV2 as Sampler from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager import pprint # import mthree try: # Try relative import first (best for package usage) from .visualize_counts import load_samples, estimate_density, plot_density_isosurface, plot_density_isosurface_slider except ImportError: try: # Try absolute import with package prefix from qlbm.visualize_counts import load_samples, estimate_density, plot_density_isosurface, plot_density_isosurface_slider except ImportError: # Fallback to direct import (for script usage) from visualize_counts import load_samples, estimate_density, plot_density_isosurface, plot_density_isosurface_slider def run_sampling_hw_ibm( n, ux, uy, uz, init_state_prep_circ, T_list, shots=2**14, vel_resolution=32, output_resolution=40, logger=None, flag_qubits=True, progress_callback=None, ): """ Run QLBM simulation on IBM quantum hardware. Parameters ---------- n : int Number of qubits per spatial dimension ux, uy, uz : callable or str Velocity field components init_state_prep_circ : QuantumCircuit Pre-built initial state preparation circuit from get_named_init_state_circuit() T_list : list[int] List of timesteps to simulate shots : int Number of measurement shots (default: 2^19) vel_resolution : int Resolution for velocity field discretization output_resolution : int Grid resolution for density estimation output logger : callable, optional Function to log messages (e.g. print to console) progress_callback : callable, optional Function to report progress (0-100) with optional status message: progress_callback(percent, message) Returns ------- job : IBMJob The submitted job object get_job_result : callable Callback function to retrieve and process results. Returns (output, fig). """ import time as time_module def log(msg): if logger: logger(str(msg)) else: print(msg) def update_progress(percent, message=None): if progress_callback: progress_callback(percent, message) # === STEP 1: Circuit Generation (0-50%) === log("Step 1: Generating quantum circuits...") update_progress(5, "Generating quantum circuits...") qc_list=get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution,flag_qubits=flag_qubits) log(f"Generated {len(qc_list)} circuit(s) for timesteps {T_list}") update_progress(15, f"Generated {len(qc_list)} circuits") pm_optimization_level = 3 log("Connecting to IBM Quantum service...") update_progress(20, "Connecting to IBM Quantum...") ibm_token = _require_env("API_KEY_IBM_QLBM", context="IBM QLBM QPU execution") service = QiskitRuntimeService( channel="ibm_cloud", token=ibm_token, instance="crn:v1:bluemix:public:quantum-computing:us-east:a/15157e4350c04a9dab51b8b8a4a93c86:e29afd91-64bf-4a82-8dbf-731e6c213595::", ) backend = service.least_busy() log(f"Selected backend: {backend.name}") update_progress(25, f"Backend: {backend.name}") qc_compiled_list=[] total_circuits = len(qc_list) for idx, qc in enumerate(qc_list): circuit_progress = 25 + (idx / total_circuits) * 20 # 25-45% update_progress(circuit_progress, f"Transpiling circuit {idx+1}/{total_circuits}...") pm = generate_preset_pass_manager(backend=backend, optimization_level=pm_optimization_level) log(f"Transpiling circuit {idx+1}/{total_circuits} via PassManager...") qc_compiled = pm.run(qc) log(f" Compiled: {qc_compiled.num_qubits} qubits, {qc_compiled.num_clbits} clbits, depth={qc_compiled.depth()}") qc_compiled_list+=[qc_compiled] log("All circuits transpiled successfully.") update_progress(50, "Circuits ready. Submitting job...") # === STEP 2: Job Submission & Monitoring (50-90%) === sampler = Sampler(mode=backend) job = sampler.run(qc_compiled_list, shots=shots) job_id = job.job_id() if hasattr(job, 'job_id') else str(job) log(f"Job submitted! Job ID: {job_id}") update_progress(52, f"Job submitted: {job_id}") def get_job_result(j): """Poll job status and retrieve results with progress updates.""" # Job status polling with progress updates log("Monitoring job status...") update_progress(55, "Job queued, waiting for execution...") # Status mapping for progress estimation # JobStatus: INITIALIZING, QUEUED, VALIDATING, RUNNING, CANCELLED, DONE, ERROR status_progress_map = { 'INITIALIZING': (55, "Job initializing..."), 'QUEUED': (58, "Job queued, waiting..."), 'VALIDATING': (62, "Validating job..."), 'RUNNING': (70, "Job running on QPU..."), 'DONE': (85, "Job completed!"), 'ERROR': (85, "Job error occurred"), 'CANCELLED': (85, "Job cancelled"), } last_status = None poll_count = 0 max_polls = 600 # ~10 minutes with 1s interval while poll_count < max_polls: try: status = j.status() status_name = status.name if hasattr(status, 'name') else str(status) if status_name != last_status: last_status = status_name progress_val, status_msg = status_progress_map.get(status_name, (60, f"Status: {status_name}")) log(f"Job Status: {status_name}") update_progress(progress_val, status_msg) # Check if job is complete if status_name in ('DONE', 'ERROR', 'CANCELLED'): break # Increment progress slightly while waiting (indeterminate feel) if status_name == 'QUEUED': # Slowly increment between 58-65 while queued queue_progress = 58 + min(7, poll_count * 0.05) update_progress(queue_progress, f"Queued... (waiting {poll_count}s)") elif status_name == 'RUNNING': # Slowly increment between 70-85 while running run_progress = 70 + min(15, poll_count * 0.1) update_progress(run_progress, f"Running on QPU... ({poll_count}s)") time_module.sleep(1) poll_count += 1 except Exception as e: log(f"Status check error: {e}") time_module.sleep(2) poll_count += 2 # Get results log("Retrieving job results...") update_progress(87, "Retrieving results...") result = j.result() log("Results retrieved successfully.") update_progress(90, "Processing results...") # === STEP 3: Creating Plots (90-100%) === output=[] total_timesteps = len(T_list) for idx, (T_total, pub) in enumerate(zip(T_list, result)): plot_progress = 90 + (idx / total_timesteps) * 8 # 90-98% update_progress(plot_progress, f"Processing timestep T={T_total}...") try: joined = pub.join_data() joined_counts = joined.get_counts() except Exception as e: log(f"Error retrieving counts for T={T_total}: {e}") joined_counts = None pts, counts = load_samples(joined_counts, T_total, logger=None, flag_qubits=flag_qubits) output+=[estimate_density(pts, counts, bandwidth=0.05, grid_size=output_resolution)] log(f"Processing complete: {len(output)} timestep(s)") update_progress(98, "Creating visualization...") fig = plot_density_isosurface_slider(output, T_list) update_progress(100, "Complete!") log("IBM QPU job completed successfully.") return output, fig return job, get_job_result from qiskit_ionq import IonQProvider class MissingCredentialError(RuntimeError): """Raised when a required API key/secret is not available at runtime.""" def _require_env(var_name: str, *, context: str) -> str: """Return a required environment variable or raise a helpful error.""" value = os.environ.get(var_name) if value is None or str(value).strip() == "": raise MissingCredentialError( f"Missing required secret '{var_name}'. " f"Set it as a runtime environment variable (e.g., Hugging Face Space → Settings → Secrets) " f"before running {context}." ) return value def run_sampling_hw_ionq( n, ux, uy, uz, init_state_prep_circ, T_list, shots=2**14, vel_resolution=32, output_resolution=40, logger=None, flag_qubits=True, progress_callback=None, ): """ Run QLBM simulation on IonQ quantum hardware. Parameters ---------- n : int Number of qubits per spatial dimension ux, uy, uz : callable or str Velocity field components init_state_prep_circ : QuantumCircuit Pre-built initial state preparation circuit from get_named_init_state_circuit() T_list : list[int] List of timesteps to simulate shots : int Number of measurement shots (default: 2^19) vel_resolution : int Resolution for velocity field discretization output_resolution : int Grid resolution for density estimation output logger : callable, optional Function to log messages (e.g. print to console) progress_callback : callable, optional Function to report progress (0-100) with optional status message: progress_callback(percent, message) Returns ------- job : IonQ Job The submitted job object get_job_result : callable Callback function to retrieve and process results. Returns (output, fig). """ import time as time_module def log(msg): if logger: logger(str(msg)) else: print(msg) def update_progress(percent, message=None): if progress_callback: progress_callback(percent, message) # === STEP 1: Circuit Generation (0-50%) === log("Step 1: Generating quantum circuits...") update_progress(5, "Generating quantum circuits...") # Ensure credentials are present (HF secrets) and make them discoverable # by qiskit-ionq's default environment variable names. ionq_token = _require_env("API_KEY_IONQ_QLBM", context="IonQ QLBM QPU execution") os.environ.setdefault("IONQ_API_TOKEN", ionq_token) provider = IonQProvider() # backend = provider.get_backend("simulator") backend = provider.get_backend("qpu.forte-enterprise-1") # Use backend.name (property) instead of backend.name() (method) for Qiskit compatibility backend_name = backend.name if isinstance(backend.name, str) else backend.name() log(f"Selected IonQ backend: {backend_name}") update_progress(15, f"Backend: {backend_name}") qc_list=get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution,flag_qubits=flag_qubits,midcircuit_meas=False) log(f"Generated {len(qc_list)} circuit(s) for timesteps {T_list}") update_progress(45, f"Generated {len(qc_list)} circuits") # Transpile circuits for IonQ with optimization_level=1 (recommended by IonQ) log("Transpiling circuits for IonQ (optimization_level=1)...") qc_list_transpiled = transpile(qc_list, backend=backend, optimization_level=1) update_progress(48, "Circuits transpiled") # === STEP 2: Job Submission (50%) === log("Submitting job to IonQ...") update_progress(50, "Submitting job to IonQ...") job = backend.run(qc_list_transpiled, shots=shots) job_id = job.job_id() if hasattr(job, 'job_id') else str(job) log(f"Job submitted! Job ID: {job_id}") update_progress(52, f"Job submitted: {job_id}") def get_job_result(j): """Poll job status and retrieve results with progress updates.""" log("Monitoring IonQ job status...") update_progress(55, "Job queued, waiting for execution...") # IonQ job status polling last_status = None poll_count = 0 max_polls = 60000 # ~10 minutes with 1s interval while poll_count < max_polls: try: status = j.status() status_name = status.name if hasattr(status, 'name') else str(status) if status_name != last_status: last_status = status_name log(f"Job Status: {status_name}") if status_name in ('QUEUED', 'VALIDATING'): update_progress(58, f"Status: {status_name}") elif status_name == 'RUNNING': update_progress(70, "Job running on IonQ QPU...") elif status_name == 'DONE': update_progress(85, "Job completed!") break elif status_name in ('ERROR', 'CANCELLED'): update_progress(85, f"Job {status_name.lower()}") break # Increment progress slightly while waiting if status_name == 'QUEUED': queue_progress = 58 + min(7, poll_count * 0.05) update_progress(queue_progress, f"Queued... (waiting {poll_count}s)") elif status_name == 'RUNNING': run_progress = 70 + min(15, poll_count * 0.1) update_progress(run_progress, f"Running on IonQ... ({poll_count}s)") # Check if done if status_name in ('DONE', 'ERROR', 'CANCELLED'): break time_module.sleep(1) poll_count += 1 except Exception as e: log(f"Status check error: {e}") time_module.sleep(2) poll_count += 2 log("Retrieving IonQ job results...") update_progress(87, "Retrieving results...") # === STEP 3: Creating Plots (90-100%) === update_progress(90, "Processing results...") output=[] total_timesteps = len(T_list) for i, T_total in enumerate(T_list): plot_progress = 90 + (i / total_timesteps) * 8 # 90-98% update_progress(plot_progress, f"Processing timestep T={T_total}...") counts = j.get_counts(i) pts, counts = load_samples(counts, T_total, logger=None, flag_qubits=flag_qubits, midcircuit_meas=False) output+=[estimate_density(pts, counts, bandwidth=0.05, grid_size=output_resolution)] log(f"Processing complete: {len(output)} timestep(s)") update_progress(98, "Creating visualization...") fig = plot_density_isosurface_slider(output, T_list) update_progress(100, "Complete!") log("IonQ job completed successfully.") return output, fig return job, get_job_result from qiskit_aer import AerSimulator def run_sampling_sim( n, ux, uy, uz, init_state_prep_circ, T_list, vel_resolution=32, progress_callback=None, ): """ Run QLBM simulation on local Aer statevector simulator. Parameters ---------- n : int Number of qubits per spatial dimension ux, uy, uz : callable or str Velocity field components init_state_prep_circ : QuantumCircuit Pre-built initial state preparation circuit from get_named_init_state_circuit() T_list : list[int] List of timesteps to simulate vel_resolution : int Resolution for velocity field discretization progress_callback : callable, optional Function to report progress (0-100) Returns ------- output : list[ndarray] List of 3D density arrays, one per timestep fig : go.Figure Plotly figure with slider animation through all timesteps (includes T=0 snapshot when available) """ # if type(ux)==str: # ux,uy,uz=str_to_lambda(ux,uy,uz) # # Convert string init_state_prep_circ to circuit if needed (matches original logic) init_state_label = init_state_prep_circ if isinstance(init_state_prep_circ, str) else "custom" # if isinstance(init_state_prep_circ, str): # init_state_prep_circ=get_named_init_state_circuit(n,init_state_prep_circ) initial_snapshot = None try: initial_snapshot = show_initial_distribution( n=n, init_state_name=str(init_state_label), plot=False, return_data=True, init_state_circuit=init_state_prep_circ, ) except Exception as exc: print(f"Warning: Unable to compute initial distribution snapshot: {exc}") qc_list=get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution,measure=False) backend = AerSimulator(method = 'statevector') output=[] total_steps = len(qc_list) for i, qc in enumerate(qc_list): if progress_callback: percent = int((i / total_steps) * 100) progress_callback(percent) qc_transpiled=qc qc_transpiled.save_statevector(conditional=True) # Try multiple shots to find a successful (zero-branch) outcome max_attempts = 100 success = False for attempt in range(max_attempts): job = backend.run(qc_transpiled, memory=True, shots=1) result = job.result() data_all = result.data() statevector_keys = list(dict(data_all['statevector']).keys()) # Look for the zero branch (0x0) zero_key = None for key in statevector_keys: if '0x' in key: if int(key[2:], 16) == 0: zero_key = key break elif key == '0' or key == '00' or key.replace('0', '') == '': zero_key = key break if zero_key is not None: success = True break if attempt < max_attempts - 1: print(f"Attempt {attempt + 1} failed (got branch {statevector_keys[0]}), retrying...") if not success: # If all attempts failed, use the first available branch with a warning print(f"Warning: Could not get zero branch after {max_attempts} attempts. Using first available branch.") zero_key = statevector_keys[0] zero_branch_state = data_all['statevector'][zero_key] sv_mean=np.mean(np.array(zero_branch_state)[:(2**n)**dim]) sv_phase=sv_mean/np.abs(sv_mean) final_answer = np.real(np.array(zero_branch_state)[:(2**n)**dim]/sv_phase) C = np.reshape(np.array(final_answer),tuple(2**n for _ in range(dim))) output+=[C] # Create meshgrid for coordinates (used for plotting) x_coords = np.linspace(0, 1, 2**n) X = np.meshgrid(x_coords, x_coords, x_coords, indexing='ij') outputs_for_plot = output.copy() times_for_plot = list(T_list) if initial_snapshot is not None: initial_density, _ = initial_snapshot if times_for_plot and times_for_plot[0] == 0: outputs_for_plot[0] = initial_density else: outputs_for_plot = [initial_density] + outputs_for_plot times_for_plot = [0] + times_for_plot # Create figure with slider for all timesteps (including T=0 if available) fig = _create_slider_figure(outputs_for_plot, times_for_plot, X) return output, fig def _create_slider_figure(output_list, T_list, X): """ Create a Plotly figure with slider to animate through timesteps. Uses visibility toggling instead of frames for better compatibility. Parameters ---------- output_list : list[ndarray] List of 3D density arrays from simulation T_list : list[int] List of timestep values X : tuple of ndarrays Meshgrid coordinates Returns ------- fig : go.Figure Plotly figure with slider animation """ # Compute global min/max for consistent color scaling global_min = min(np.min(C) for C in output_list) global_max = max(np.max(C) for C in output_list) fig = go.Figure() # Add a trace for each timestep for i, (C, T) in enumerate(zip(output_list, T_list)): visible = (i == 0) # Only the first trace is visible initially fig.add_trace(go.Isosurface( x=X[2].flatten(), y=X[1].flatten(), z=X[0].flatten(), value=C.flatten(), isomin=global_min, isomax=global_max, opacity=0.4, surface_count=10, caps=dict(x_show=False, y_show=False, z_show=False), colorscale=QLBM_PLOT_COLORSCALE, colorbar=dict(title="Density"), visible=visible, name=f"T={T}" )) # Create slider steps steps = [] for i, T in enumerate(T_list): # Create visibility array: only the i-th trace is True step = dict( method="update", args=[{"visible": [False] * len(output_list)}, {"title": f"QLBM Simulation - Timestep T={T}"}], label=str(T) ) step["args"][0]["visible"][i] = True # Toggle i-th trace to True steps.append(step) sliders = [dict( active=0, currentvalue={"prefix": "Timestep: "}, pad={"t": 50}, steps=steps )] fig.update_layout( title=f"QLBM Simulation - Timestep T={T_list[0]}", scene=dict( xaxis_title="X", yaxis_title="Y", zaxis_title="Z", aspectmode='cube', ), sliders=sliders ) return fig def show_initial_distribution( n: int, init_state_name: str = "sin", # Sinusoidal parameters (frequency multipliers) sine_k_x: float = 1.0, sine_k_y: float = 1.0, sine_k_z: float = 1.0, # Gaussian parameters gauss_cx: float = None, gauss_cy: float = None, gauss_cz: float = None, gauss_sigma: float = None, # Multi-dirac-delta parameters mdd_kx_log2: int = 1, mdd_ky_log2: int = 1, mdd_kz_log2: int = 1, # Display options plot: bool = True, return_data: bool = False, init_state_circuit=None, ): """ Visualize the initial distribution by running the state preparation circuit from get_named_init_state_circuit and extracting the resulting statevector. Parameters ---------- n : int Number of qubits per spatial dimension (grid size = 2^n per axis) init_state_name : str One of "dirac_delta", "sin", "gaussian", "multi_dirac_delta" sine_k_x, sine_k_y, sine_k_z : float Frequency multipliers for sinusoidal distribution (default=1.0) gauss_cx, gauss_cy, gauss_cz : float Center coordinates in [0,1] for Gaussian (default=0.5) gauss_sigma : float Spread of Gaussian in normalized units (default=0.2) mdd_kx_log2, mdd_ky_log2, mdd_kz_log2 : int log2 of frequency multipliers for multi-dirac-delta distribution (default=1) plot : bool Whether to display the 3D isosurface plot (default=True) return_data : bool Whether to return the distribution data (default=False) init_state_circuit : QuantumCircuit, optional Pre-built circuit to evaluate. When provided, ``init_state_name`` and the associated parameters are ignored and the supplied circuit is simulated directly. Returns ------- If return_data is True: C : ndarray 3D array of shape (2^n, 2^n, 2^n) containing the initial distribution X : tuple of ndarrays Meshgrid coordinates (X[0], X[1], X[2]) for x, y, z axes If return_data is False: None """ N = 2**n # Determine the state preparation circuit. Either use the provided circuit # (useful when custom circuits are passed in) or construct one from the # named presets for standalone previews. if init_state_circuit is not None: init_state_prep_circ = init_state_circuit.copy() else: init_state_prep_circ = get_named_init_state_circuit( n, init_state_name, sine_k_x=sine_k_x, sine_k_y=sine_k_y, sine_k_z=sine_k_z, gauss_cx=gauss_cx, gauss_cy=gauss_cy, gauss_cz=gauss_cz, gauss_sigma=gauss_sigma, mdd_kx_log2=mdd_kx_log2, mdd_ky_log2=mdd_ky_log2, mdd_kz_log2=mdd_kz_log2, ) # Run the circuit on statevector simulator to extract the initial state backend = AerSimulator(method='statevector') # Create a copy of the circuit and save statevector qc = init_state_prep_circ.copy() qc.save_statevector() job = backend.run(qc, shots=1) result = job.result() statevector = np.array(result.get_statevector()) # The statevector represents the initial distribution (amplitudes) # Take the real part of the amplitudes (they should be real for these distributions) init_state = np.real(statevector) # Reshape to 3D grid C = np.reshape(init_state, (N, N, N)) # Create meshgrid for coordinates x_coords = np.linspace(0, 1, N) X = np.meshgrid(x_coords, x_coords, x_coords, indexing='ij') if plot: print(f"Initial distribution: {init_state_name}") print(f"Grid size: {N} x {N} x {N}") if init_state_name == "sin": print(f"Sine frequencies: kx={sine_k_x}, ky={sine_k_y}, kz={sine_k_z}") elif init_state_name == "gaussian": cx = float(gauss_cx) if gauss_cx is not None else 0.5 cy = float(gauss_cy) if gauss_cy is not None else 0.5 cz = float(gauss_cz) if gauss_cz is not None else 0.5 sigma = float(gauss_sigma) if gauss_sigma is not None else 0.2 print(f"Gaussian center: ({cx}, {cy}, {cz}), sigma={sigma}") elif init_state_name == "multi_dirac_delta": print(f"Multi-Dirac-Delta: kx_log2={mdd_kx_log2}, ky_log2={mdd_ky_log2}, kz_log2={mdd_kz_log2}") print(f" Number of peaks: {2**mdd_kx_log2} x {2**mdd_ky_log2} x {2**mdd_kz_log2}") print("Distribution stats:") print(f" Min: {np.min(C):.6f}, Max: {np.max(C):.6f}") print(f" Mean: {np.mean(C):.6f}, Std: {np.std(C):.6f}") Cmax, Cmin = np.max(C.flatten()), np.min(C.flatten()) fig = go.Figure(data=go.Isosurface( x=X[2].flatten(), y=X[1].flatten(), z=X[0].flatten(), value=C.flatten(), isomin=Cmin, isomax=Cmax, opacity=0.4, surface_count=10, caps=dict(x_show=False, y_show=False, z_show=False), colorscale=QLBM_PLOT_COLORSCALE, )) fig.update_layout( title=f"Initial Distribution: {init_state_name}", scene=dict( xaxis_title="X", yaxis_title="Y", zaxis_title="Z", ), ) fig.show() if return_data: return C, X return None if __name__=="__main__": n=3 # Step 1: Create the initial state circuit ONCE with all parameters # init_state_prep_circ = get_named_init_state_circuit( # n=n, # init_state_name="multi_dirac_delta", # or "gaussian", "dirac_delta" # ) # # Alternative: Run on local simulator # output, fig = run_sampling_sim( # n=n, # ux="sin(-2*pi*z)", # uy="1", # uz="sin(2*pi*x)", # init_state_prep_circ="multi_dirac_delta", # T_list=[1,3,5,7,9], # vel_resolution=16 # ) # print(output) # fig.show(renderer="browser") # Step 2: (Optional) Preview the initial distribution # show_initial_distribution(n=n, init_state_name="sin", sine_k_x=1, sine_k_y=1, sine_k_z=1) # Step 3: Run simulation - pass the pre-built circuit job, get_job_result = run_sampling_hw_ionq( n=n, ux="1", uy="1", uz="1", init_state_prep_circ="multi_dirac_delta", # Pass the circuit directly T_list=[1,2], shots=2**15, vel_resolution=2, output_resolution=16 ) output,fig = get_job_result(job) fig.show(renderer="browser")