quantum / qlbm /qlbm_sample_app.py
harishaseebat92
EM : IBM/ION Job Retrieve Upload Window
6ca7a4a
from qiskit import QuantumCircuit,QuantumRegister,ClassicalRegister,transpile
from qiskit.synthesis.qft import synth_qft_full as QFT
import numpy as np
import os
from sympy import sympify, symbols, lambdify
from qiskit_ibm_runtime import QiskitRuntimeService
import plotly.graph_objects as go
dim=3
QLBM_PLOT_COLORSCALE = "Turbo"
def bin_to_gray(bin_s):
XOR=lambda x,y: (x or y) and not (x and y)
gray_s=bin_s[0]
for i in range(len(bin_s)-1):
c_bool=XOR(bool(int(bin_s[i])),bool(int(bin_s[i+1])))
gray_s+=str(int(c_bool))
return gray_s
def gray_to_bin(gray_s):
XOR=lambda x,y: (x or y) and not (x and y)
bin_s=gray_s[0]
for i in range(len(gray_s)-1):
c_bool=XOR(bool(int(bin_s[i])),bool(int(gray_s[i+1])))
bin_s+=str(int(c_bool))
return bin_s
def bin_to_int(bin_s):
return int(bin_s,2)
def int_to_bin(i,pad):
return bin(i)[2:].zfill(pad)
def fwht_approx(f,N,num_points_per_dim,threshold=1e-10):
linear_block_size=int(N//num_points_per_dim)
num_angles_per_block=int(np.log2(linear_block_size))
thetas={}
for k in range(num_points_per_dim):
for j in range(num_points_per_dim):
for i in range(num_points_per_dim):
avg_f=2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size+(linear_block_size-1)/2))
thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size]=avg_f
slope_x=(2*np.arccos(f(i*linear_block_size,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size+(linear_block_size-1)/2))-2*np.arccos(f(((i+1)%N)*linear_block_size,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size+(linear_block_size-1)/2)))/linear_block_size
slope_y=(2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size,k*linear_block_size+(linear_block_size-1)/2))-2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,((j+1)%N)*linear_block_size,k*linear_block_size+(linear_block_size-1)/2)))/linear_block_size
slope_z=(2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size))-2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size+(linear_block_size-1)/2,((k+1)%N)*linear_block_size)))/linear_block_size
for m in range(num_angles_per_block):
thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size + 2**m]=slope_x*(2**(m-1))
thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size + N*(2**m)]=slope_y*(2**(m-1))
thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size + (N**2)*(2**m)]=slope_z*(2**(m-1))
h = linear_block_size
while h < N**3:
for i in range(0, N**3, h * 2):
if (i//N)%linear_block_size!=0:
continue
if (i//(N**2))%linear_block_size!=0:
continue
j=i
while j<i+h:
index=j
x = thetas[index]
y = thetas[index + h]
thetas[index] = (x + y)/2
thetas[index + h] = (x - y)/2
for ax in range(3):
for m in range(num_angles_per_block):
index=j+(N**ax)*(2**m)
x = thetas[index]
y = thetas[index + h]
thetas[index] = (x + y)/2
thetas[index + h] = (x - y)/2
j+=linear_block_size
if (j//N)%linear_block_size==1:
j+=(linear_block_size-1)*N
if (j//(N**2))%linear_block_size==1:
j+=(linear_block_size-1)*(N**2)
h *= 2
if h==N:
h=N*linear_block_size
if h==N**2:
h=(N**2)*linear_block_size
theta_sorted=sorted(np.abs(list(thetas.values())))
sum_=0
for th in theta_sorted:
sum_+=th
if sum_>threshold:
threshold=sum_-th
break
return [theta for theta in thetas.values() if abs(theta)>threshold],[key for key in thetas.keys() if abs(thetas[key])>threshold]
def get_circuit_inputs(f,num_reg_qubits,num_points_per_dim):
theta_vec,indices=fwht_approx(f,2**num_reg_qubits,num_points_per_dim,1e-4)
circ_pos=[]
for ind in indices:
circ_pos+=[bin_to_int(gray_to_bin(int_to_bin(ind,num_reg_qubits*3)))]
sorted_theta_vec=sorted(zip(theta_vec,circ_pos),key=lambda el:el[1])
ctrls=[]
current_bs="0"*(3*num_reg_qubits)
for el in sorted_theta_vec:
new_bs=bin_to_gray(int_to_bin((el[1])%(2**(3*num_reg_qubits)),(3*num_reg_qubits)))
ctrls += [[i for i, (char1, char2) in enumerate(zip(current_bs[::-1], new_bs[::-1])) if char1 != char2]]
current_bs=new_bs
new_bs="0"*(3*num_reg_qubits)
ctrls += [[i for i, (char1, char2) in enumerate(zip(current_bs[::-1], new_bs[::-1])) if char1 != char2]]
return [el[0] for el in sorted_theta_vec],ctrls
def get_coeffs(n,ux,uy,uz,resolution=32):
current_N=2**n
x_coeffs,x_coeff_var_indices=get_circuit_inputs(lambda x,y,z: ((1+ux(x/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution))
y_coeffs,y_coeff_var_indices=get_circuit_inputs(lambda x,y,z: ((1+uy(x/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution))
z_coeffs,z_coeff_var_indices=get_circuit_inputs(lambda x,y,z: ((1+uz(x/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution))
x_coeffs_,x_coeff_var_indices_=get_circuit_inputs(lambda x,y,z: 0 if (1+ux((x-1)/current_N,y/current_N,z/current_N))==0 else \
((1+ux((x-1)/current_N,y/current_N,z/current_N))/(2+ux((x-1)/current_N,y/current_N,z/current_N)-ux((x+1)/current_N,y/current_N,z/current_N)))**0.5,n,min(current_N,resolution))
y_coeffs_,y_coeff_var_indices_=get_circuit_inputs(lambda x,y,z: 0 if (1+uy(x/current_N,(y-1)/current_N,z/current_N))==0 else \
((1+uy(x/current_N,(y-1)/current_N,z/current_N))/(2+uy(x/current_N,(y-1)/current_N,z/current_N)-uy(x/current_N,(y+1)/current_N,z/current_N)))**0.5,n,min(current_N,resolution))
z_coeffs_,z_coeff_var_indices_=get_circuit_inputs(lambda x,y,z: 0 if (1+uz(x/current_N,y/current_N,(z-1)/current_N))==0 else \
((1+uz(x/current_N,y/current_N,(z-1)/current_N))/(2+uz(x/current_N,y/current_N,(z-1)/current_N)-uz(x/current_N,y/current_N,(z+1)/current_N)))**0.5,n,min(current_N,resolution))
unprep1_coeffs,unprep1_coeff_var_indices=get_circuit_inputs(lambda x,y,z:\
(1/3**0.5)*(1+(ux((x-1)/current_N,y/current_N,z/current_N)-ux((x+1)/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution))
unprep2_coeffs,unprep2_coeff_var_indices=get_circuit_inputs(lambda x,y,z:\
((1+(uy(x/current_N,(y-1)/current_N,z/current_N)-uy(x/current_N,(y+1)/current_N,z/current_N))/2)/(2-(ux((x-1)/current_N,y/current_N,z/current_N)-ux((x+1)/current_N,y/current_N,z/current_N))/2))**0.5,n,min(current_N,resolution))
return x_coeffs,x_coeff_var_indices, y_coeffs,y_coeff_var_indices, z_coeffs,z_coeff_var_indices,\
x_coeffs_,x_coeff_var_indices_, y_coeffs_,y_coeff_var_indices_, z_coeffs_,z_coeff_var_indices_,\
unprep1_coeffs,unprep1_coeff_var_indices, unprep2_coeffs,unprep2_coeff_var_indices
def get_coll_ops(n,ux,uy,uz,resolution=32):
x_coeffs,x_coeff_var_indices, y_coeffs,y_coeff_var_indices, z_coeffs,z_coeff_var_indices,\
x_coeffs_,x_coeff_var_indices_, y_coeffs_,y_coeff_var_indices_, z_coeffs_,z_coeff_var_indices_,\
unprep1_coeffs,unprep1_coeff_var_indices, unprep2_coeffs,unprep2_coeff_var_indices = get_coeffs(n,ux,uy,uz,resolution)
def prep(qc,pos_qr,dir_qr):
qc.h(dir_qr[0])
qc.h(dir_qr[4])
qc.cx(dir_qr[0],dir_qr[2])
qc.ry(-np.pi/4,dir_qr[4])
qc.cx(dir_qr[2],dir_qr[4])
qc.ry(np.pi/4,dir_qr[4])
qc.cx(dir_qr[2],dir_qr[4])
qc.ry(-np.pi/4,dir_qr[2])
qc.cx(dir_qr[0],dir_qr[2])
qc.ry(np.pi/4,dir_qr[2])
qc.cx(dir_qr[0],dir_qr[2])
qc.cx(dir_qr[2],dir_qr[0])
qc.cx(dir_qr[0],dir_qr[1])
for i in range(len(x_coeff_var_indices)):
for ind in x_coeff_var_indices[i]:
qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[0])
if i<len(x_coeffs):
qc.cry(x_coeffs[i],dir_qr[1],dir_qr[0])
qc.cx(dir_qr[0],dir_qr[1])
qc.cx(dir_qr[2],dir_qr[3])
for i in range(len(y_coeff_var_indices)):
for ind in y_coeff_var_indices[i]:
qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[2])
if i<len(y_coeffs):
qc.cry(y_coeffs[i],dir_qr[3],dir_qr[2])
qc.cx(dir_qr[2],dir_qr[3])
qc.cx(dir_qr[4],dir_qr[5])
for i in range(len(z_coeff_var_indices)):
for ind in z_coeff_var_indices[i]:
qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[4])
if i<len(z_coeffs):
qc.cry(z_coeffs[i],dir_qr[5],dir_qr[4])
qc.cx(dir_qr[4],dir_qr[5])
def unprep(qc,pos_qr,dir_qr):
qc.cx(dir_qr[0],dir_qr[1])
for i in range(len(x_coeff_var_indices_)):
for ind in x_coeff_var_indices_[i]:
qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[0])
if i<len(x_coeffs_):
qc.cry(-x_coeffs_[i],dir_qr[1],dir_qr[0])
qc.cx(dir_qr[0],dir_qr[1])
qc.cx(dir_qr[2],dir_qr[3])
for i in range(len(y_coeff_var_indices_)):
for ind in y_coeff_var_indices_[i]:
qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[2])
if i<len(y_coeffs_):
qc.cry(-y_coeffs_[i],dir_qr[3],dir_qr[2])
qc.cx(dir_qr[2],dir_qr[3])
qc.cx(dir_qr[4],dir_qr[5])
for i in range(len(z_coeff_var_indices_)):
for ind in z_coeff_var_indices_[i]:
qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[4])
if i<len(z_coeffs_):
qc.cry(-z_coeffs_[i],dir_qr[5],dir_qr[4])
qc.cx(dir_qr[4],dir_qr[5])
qc.cx(dir_qr[2],dir_qr[4])
for i in range(len(unprep2_coeff_var_indices)):
for ind in unprep2_coeff_var_indices[i]:
qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[2])
if i<len(unprep2_coeffs):
qc.cry(unprep2_coeffs[i],dir_qr[4],dir_qr[2])
qc.cx(dir_qr[2],dir_qr[4])
qc.cx(dir_qr[0],dir_qr[2])
for i in range(len(unprep1_coeff_var_indices)):
for ind in unprep1_coeff_var_indices[i]:
qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[0])
if i<len(unprep1_coeffs):
qc.cry(unprep1_coeffs[i],dir_qr[2],dir_qr[0])
qc.cx(dir_qr[0],dir_qr[2])
qc.ry(-2*np.pi/3,dir_qr[0])
return prep,unprep
def stream(qc,pos_qr,dir_qr,n):
for i in range(dim):
forw_ctrl=dir_qr[2*i]
backw_ctrl=dir_qr[2*i+1]
for m in range(n):
qc.cp( np.pi / (2 ** m), forw_ctrl, pos_qr[i][m])
qc.cp(-np.pi / (2 ** m), backw_ctrl, pos_qr[i][m])
def get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution=32,measure=True,flag_qubits=False,midcircuit_meas=True):
ux_str,uy_str,uz_str=None,None,None
if type(ux)==str:
ux_str,uy_str,uz_str=ux,uy,uz
ux,uy,uz=str_to_lambda(ux_str,uy_str,uz_str)
if type(init_state_prep_circ)==str:
init_state_prep_circ=get_named_init_state_circuit(n,init_state_prep_circ)
dirs=[[0,0,0],[1,0,0],[-1,0,0],[0,1,0],[0,-1,0],[0,0,1],[0,0,-1]]
wts = np.array([2/8, 1/8, 1/8, 1/8, 1/8, 1/8, 1/8])
dir_indices=["".join(["0"+str(el) if el>=0 else str(-el)+"0" for el in dir_[::-1]]) for dir_ in dirs]
dirs_state=np.zeros(2**7)
for i,dir_ind in enumerate(dir_indices):
ind=int(dir_ind,2)
dirs_state[ind]=wts[i]**0.5
qc_list=[]
prep, unprep=get_coll_ops(n,ux,uy,uz,vel_resolution)
for T_total in T_list:
pos_qr=[QuantumRegister(n) for _ in range(dim)]
pos_cr=[ClassicalRegister(n) for _ in range(dim)]
if midcircuit_meas:
dir_qr=QuantumRegister(2*dim)
else:
dir_qr_list=[QuantumRegister(2*dim) for _ in range(T_total)]
dir_qr_flag=QuantumRegister(2*dim)
dir_cr=[ClassicalRegister((4 if flag_qubits and midcircuit_meas else 2)*dim) for _ in range(T_total+int(flag_qubits and not midcircuit_meas))]
if flag_qubits:
if midcircuit_meas:
qc=QuantumCircuit(*pos_qr,dir_qr,dir_qr_flag,*pos_cr,*dir_cr)
else:
qc=QuantumCircuit(*pos_qr,*dir_qr_list,dir_qr_flag,*pos_cr,*dir_cr)
else:
if midcircuit_meas:
qc=QuantumCircuit(*pos_qr,dir_qr,*pos_cr,*dir_cr)
else:
qc=QuantumCircuit(*pos_qr,*dir_qr_list,*pos_cr,*dir_cr)
qc.compose(init_state_prep_circ,[qubit for qr in pos_qr for qubit in list(qr)], inplace=True)
uniform_bool=False
if ux_str is not None:
if 'x' not in ux_str+uy_str+uz_str and 'y' not in ux_str+uy_str+uz_str and 'z' not in ux_str+uy_str+uz_str:
uniform_bool=True
if uniform_bool:
for i in range(dim):
qc.compose(QFT(n, inverse=False, do_swaps=False), pos_qr[i], inplace=True)
for T in list(range(T_total))[::-1]:
if not midcircuit_meas:
dir_qr=dir_qr_list[T]
prep(qc,pos_qr,dir_qr)
if flag_qubits:
for q1,q2 in zip(dir_qr,dir_qr_flag):
qc.cx(q1,q2)
if not uniform_bool:
for i in range(dim):
qc.compose(QFT(n, inverse=False, do_swaps=False), pos_qr[i], inplace=True)
stream(qc,pos_qr,dir_qr,n)
if not uniform_bool:
for i in range(dim):
qc.compose(QFT(n, inverse=True, do_swaps=False), pos_qr[i], inplace=True)
if flag_qubits:
for q1,q2 in zip(dir_qr,dir_qr_flag):
qc.cx(q1,q2)
unprep(qc,pos_qr,dir_qr)
if flag_qubits and midcircuit_meas:
qc.measure(list(dir_qr)+list(dir_qr_flag),dir_cr[T])
else:
qc.measure(dir_qr,dir_cr[T])
if not midcircuit_meas and flag_qubits:
qc.measure(dir_qr_flag,dir_cr[T_total])
if uniform_bool:
for i in range(dim):
qc.compose(QFT(n, inverse=True, do_swaps=False), pos_qr[i], inplace=True)
if measure:
for i in range(dim):
qc.measure(pos_qr[i],pos_cr[i])
qc_list+=[qc]
return qc_list
def str_to_lambda(vx_param,vy_param,vz_param):
vx_val = str(vx_param)
vy_val = str(vy_param)
vz_val = str(vz_param)
x_sym, y_sym, z_sym = symbols('x y z')
vx_sympified = sympify(vx_val)
vy_sympified = sympify(vy_val)
vz_sympified = sympify(vz_val)
vx=lambdify((x_sym, y_sym, z_sym), vx_sympified, modules="numpy")
vy=lambdify((x_sym, y_sym, z_sym), vy_sympified, modules="numpy")
vz=lambdify((x_sym, y_sym, z_sym), vz_sympified, modules="numpy")
return vx,vy,vz
def get_named_init_state_circuit(
n: int,
init_state_name: str,
# Sinusoidal parameters (frequency multipliers)
sine_k_x: float = 1.0,
sine_k_y: float = 1.0,
sine_k_z: float = 1.0,
# Gaussian parameters
gauss_cx: float = None, # Center X (0-1 normalized), defaults to 0.5
gauss_cy: float = None, # Center Y (0-1 normalized), defaults to 0.5
gauss_cz: float = None, # Center Z (0-1 normalized), defaults to 0.5
gauss_sigma: float = None, # Spread, defaults to 0.2 in normalized units
# Multi-dirac-delta parameters
mdd_kx_log2: int = 1, # Integer greater than >=1. Number of dirac-deltas along x is 2^mdd_kx_log2
mdd_ky_log2: int = 1, # Integer greater than >=1. Number of dirac-deltas along y is 2^mdd_ky_log2
mdd_kz_log2: int = 1 # Integer greater than >=1. Number of dirac-deltas along z is 2^mdd_kz_log2
):
"""
Create initial state preparation circuit with configurable parameters.
Parameters
----------
n : int
Number of qubits per spatial dimension (grid size = 2^n per axis)
init_state_name : str
One of "dirac_delta", "sin", "gaussian"
sine_k_x, sine_k_y, sine_k_z : float
Frequency multipliers for sinusoidal distribution (default=1.0)
gauss_cx, gauss_cy, gauss_cz : float
Center coordinates in [0,1] for Gaussian (default=0.5)
gauss_sigma : float
Spread of Gaussian in normalized units (default=0.2)
mdd_kx_log2, mdd_ky_log2, mdd_kz_log2 : int
log2 of frequency multipliers for dirac-delta array distribution (default=1)
Returns
-------
QuantumCircuit
State preparation circuit
"""
N = 2**n
init_state_prep_circ = QuantumCircuit(3*n)
if init_state_name == "dirac_delta":
init_state_prep_circ.x(n-1)
init_state_prep_circ.x(2*n-1)
init_state_prep_circ.x(3*n-1)
elif init_state_name == "multi_dirac_delta":
init_state_prep_circ.h(range(n-mdd_kx_log2,n))
init_state_prep_circ.x(n-1-mdd_kx_log2)
init_state_prep_circ.h(range(2*n-mdd_ky_log2,2*n))
init_state_prep_circ.x(2*n-1-mdd_ky_log2)
init_state_prep_circ.h(range(3*n-mdd_kz_log2,3*n))
init_state_prep_circ.x(3*n-1-mdd_kz_log2)
elif init_state_name == "sin":
# Configurable frequency sinusoidal distribution
# f(x,y,z) = 1 + sin(2π * kx * x) * sin(2π * ky * y) * sin(2π * kz * z)
kx = max(1, int(round(float(sine_k_x))))
ky = max(1, int(round(float(sine_k_y))))
kz = max(1, int(round(float(sine_k_z))))
coords = np.arange(N) / N # Normalized [0, 1)
sin_x = np.sin(2 * np.pi * kx * coords)
sin_y = np.sin(2 * np.pi * ky * coords)
sin_z = np.sin(2 * np.pi * kz * coords)
# Build 3D state via Kronecker products
# Order matches original: z ⊗ (y ⊗ x)
init_state = 1 + (
np.kron(sin_z, np.ones(N**2)) *
np.kron(np.ones(N**2), sin_x) *
np.kron(np.ones(N), np.kron(sin_y, np.ones(N)))
)
init_state_prep_circ.prepare_state(init_state.astype(np.complex128), normalize=True)
init_state_prep_circ = transpile(init_state_prep_circ, basis_gates=['u1', 'u2', 'u3', 'cx'])
elif init_state_name == "gaussian":
# Configurable Gaussian distribution
# f(x,y,z) = exp(-((x-cx)^2 + (y-cy)^2 + (z-cz)^2) / (2*sigma^2))
# Default centers to 0.5 (middle of domain) - matches original mu=0.5
cx = float(gauss_cx) if gauss_cx is not None else 0.5
cy = float(gauss_cy) if gauss_cy is not None else 0.5
cz = float(gauss_cz) if gauss_cz is not None else 0.5
# Default sigma to 1.0 to match original sig=1 behavior
# Original formula: exp(-((x - mu) / sig)^2 / 2) with sig=1
# Our formula: exp(-((x - cx)^2) / (2 * sigma^2))
# For equivalence: sigma = 1.0 (they are the same formula)
sigma = float(gauss_sigma) if gauss_sigma is not None else 1.0
coords = np.arange(N) / N # Normalized [0, 1)
gauss_x = np.exp(-((coords - cx)**2) / (2 * sigma**2))
gauss_y = np.exp(-((coords - cy)**2) / (2 * sigma**2))
gauss_z = np.exp(-((coords - cz)**2) / (2 * sigma**2))
# Build 3D state via Kronecker products (same order as original)
init_state = (
np.kron(gauss_z, np.ones(N**2)) *
np.kron(np.ones(N**2), gauss_x) *
np.kron(np.ones(N), np.kron(gauss_y, np.ones(N)))
)
init_state_prep_circ.prepare_state(init_state.astype(np.complex128), normalize=True)
init_state_prep_circ = transpile(init_state_prep_circ, basis_gates=['u1', 'u2', 'u3', 'cx'])
return init_state_prep_circ
##########################################################################################
from qiskit_ibm_runtime import QiskitRuntimeService, SamplerV2 as Sampler
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
import pprint
# import mthree
try:
# Try relative import first (best for package usage)
from .visualize_counts import load_samples, estimate_density, plot_density_isosurface, plot_density_isosurface_slider
except ImportError:
try:
# Try absolute import with package prefix
from qlbm.visualize_counts import load_samples, estimate_density, plot_density_isosurface, plot_density_isosurface_slider
except ImportError:
# Fallback to direct import (for script usage)
from visualize_counts import load_samples, estimate_density, plot_density_isosurface, plot_density_isosurface_slider
def run_sampling_hw_ibm(
n,
ux,
uy,
uz,
init_state_prep_circ,
T_list,
shots=2**14,
vel_resolution=32,
output_resolution=40,
logger=None,
flag_qubits=True,
progress_callback=None,
):
"""
Run QLBM simulation on IBM quantum hardware.
Parameters
----------
n : int
Number of qubits per spatial dimension
ux, uy, uz : callable or str
Velocity field components
init_state_prep_circ : QuantumCircuit
Pre-built initial state preparation circuit from get_named_init_state_circuit()
T_list : list[int]
List of timesteps to simulate
shots : int
Number of measurement shots (default: 2^19)
vel_resolution : int
Resolution for velocity field discretization
output_resolution : int
Grid resolution for density estimation output
logger : callable, optional
Function to log messages (e.g. print to console)
progress_callback : callable, optional
Function to report progress (0-100) with optional status message: progress_callback(percent, message)
Returns
-------
job : IBMJob
The submitted job object
get_job_result : callable
Callback function to retrieve and process results. Returns (output, fig).
"""
import time as time_module
def log(msg):
if logger:
logger(str(msg))
else:
print(msg)
def update_progress(percent, message=None):
if progress_callback:
progress_callback(percent, message)
# === STEP 1: Circuit Generation (0-50%) ===
log("Step 1: Generating quantum circuits...")
update_progress(5, "Generating quantum circuits...")
qc_list=get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution,flag_qubits=flag_qubits)
log(f"Generated {len(qc_list)} circuit(s) for timesteps {T_list}")
update_progress(15, f"Generated {len(qc_list)} circuits")
pm_optimization_level = 3
log("Connecting to IBM Quantum service...")
update_progress(20, "Connecting to IBM Quantum...")
ibm_token = _require_env("API_KEY_IBM_QLBM", context="IBM QLBM QPU execution")
service = QiskitRuntimeService(
channel="ibm_cloud",
token=ibm_token,
instance="crn:v1:bluemix:public:quantum-computing:us-east:a/15157e4350c04a9dab51b8b8a4a93c86:e29afd91-64bf-4a82-8dbf-731e6c213595::",
)
backend = service.least_busy()
log(f"Selected backend: {backend.name}")
update_progress(25, f"Backend: {backend.name}")
qc_compiled_list=[]
total_circuits = len(qc_list)
for idx, qc in enumerate(qc_list):
circuit_progress = 25 + (idx / total_circuits) * 20 # 25-45%
update_progress(circuit_progress, f"Transpiling circuit {idx+1}/{total_circuits}...")
pm = generate_preset_pass_manager(backend=backend, optimization_level=pm_optimization_level)
log(f"Transpiling circuit {idx+1}/{total_circuits} via PassManager...")
qc_compiled = pm.run(qc)
log(f" Compiled: {qc_compiled.num_qubits} qubits, {qc_compiled.num_clbits} clbits, depth={qc_compiled.depth()}")
qc_compiled_list+=[qc_compiled]
log("All circuits transpiled successfully.")
update_progress(50, "Circuits ready. Submitting job...")
# === STEP 2: Job Submission & Monitoring (50-90%) ===
sampler = Sampler(mode=backend)
job = sampler.run(qc_compiled_list, shots=shots)
job_id = job.job_id() if hasattr(job, 'job_id') else str(job)
log(f"Job submitted! Job ID: {job_id}")
update_progress(52, f"Job submitted: {job_id}")
def get_job_result(j):
"""Poll job status and retrieve results with progress updates."""
# Job status polling with progress updates
log("Monitoring job status...")
update_progress(55, "Job queued, waiting for execution...")
# Status mapping for progress estimation
# JobStatus: INITIALIZING, QUEUED, VALIDATING, RUNNING, CANCELLED, DONE, ERROR
status_progress_map = {
'INITIALIZING': (55, "Job initializing..."),
'QUEUED': (58, "Job queued, waiting..."),
'VALIDATING': (62, "Validating job..."),
'RUNNING': (70, "Job running on QPU..."),
'DONE': (85, "Job completed!"),
'ERROR': (85, "Job error occurred"),
'CANCELLED': (85, "Job cancelled"),
}
last_status = None
poll_count = 0
max_polls = 600 # ~10 minutes with 1s interval
while poll_count < max_polls:
try:
status = j.status()
status_name = status.name if hasattr(status, 'name') else str(status)
if status_name != last_status:
last_status = status_name
progress_val, status_msg = status_progress_map.get(status_name, (60, f"Status: {status_name}"))
log(f"Job Status: {status_name}")
update_progress(progress_val, status_msg)
# Check if job is complete
if status_name in ('DONE', 'ERROR', 'CANCELLED'):
break
# Increment progress slightly while waiting (indeterminate feel)
if status_name == 'QUEUED':
# Slowly increment between 58-65 while queued
queue_progress = 58 + min(7, poll_count * 0.05)
update_progress(queue_progress, f"Queued... (waiting {poll_count}s)")
elif status_name == 'RUNNING':
# Slowly increment between 70-85 while running
run_progress = 70 + min(15, poll_count * 0.1)
update_progress(run_progress, f"Running on QPU... ({poll_count}s)")
time_module.sleep(1)
poll_count += 1
except Exception as e:
log(f"Status check error: {e}")
time_module.sleep(2)
poll_count += 2
# Get results
log("Retrieving job results...")
update_progress(87, "Retrieving results...")
result = j.result()
log("Results retrieved successfully.")
update_progress(90, "Processing results...")
# === STEP 3: Creating Plots (90-100%) ===
output=[]
total_timesteps = len(T_list)
for idx, (T_total, pub) in enumerate(zip(T_list, result)):
plot_progress = 90 + (idx / total_timesteps) * 8 # 90-98%
update_progress(plot_progress, f"Processing timestep T={T_total}...")
try:
joined = pub.join_data()
joined_counts = joined.get_counts()
except Exception as e:
log(f"Error retrieving counts for T={T_total}: {e}")
joined_counts = None
pts, counts = load_samples(joined_counts, T_total, logger=None, flag_qubits=flag_qubits)
output+=[estimate_density(pts, counts, bandwidth=0.05, grid_size=output_resolution)]
log(f"Processing complete: {len(output)} timestep(s)")
update_progress(98, "Creating visualization...")
fig = plot_density_isosurface_slider(output, T_list)
update_progress(100, "Complete!")
log("IBM QPU job completed successfully.")
return output, fig
return job, get_job_result
from qiskit_ionq import IonQProvider
class MissingCredentialError(RuntimeError):
"""Raised when a required API key/secret is not available at runtime."""
def _require_env(var_name: str, *, context: str) -> str:
"""Return a required environment variable or raise a helpful error."""
value = os.environ.get(var_name)
if value is None or str(value).strip() == "":
raise MissingCredentialError(
f"Missing required secret '{var_name}'. "
f"Set it as a runtime environment variable (e.g., Hugging Face Space → Settings → Secrets) "
f"before running {context}."
)
return value
def run_sampling_hw_ionq(
n,
ux,
uy,
uz,
init_state_prep_circ,
T_list,
shots=2**14,
vel_resolution=32,
output_resolution=40,
logger=None,
flag_qubits=True,
progress_callback=None,
):
"""
Run QLBM simulation on IonQ quantum hardware.
Parameters
----------
n : int
Number of qubits per spatial dimension
ux, uy, uz : callable or str
Velocity field components
init_state_prep_circ : QuantumCircuit
Pre-built initial state preparation circuit from get_named_init_state_circuit()
T_list : list[int]
List of timesteps to simulate
shots : int
Number of measurement shots (default: 2^19)
vel_resolution : int
Resolution for velocity field discretization
output_resolution : int
Grid resolution for density estimation output
logger : callable, optional
Function to log messages (e.g. print to console)
progress_callback : callable, optional
Function to report progress (0-100) with optional status message: progress_callback(percent, message)
Returns
-------
job : IonQ Job
The submitted job object
get_job_result : callable
Callback function to retrieve and process results. Returns (output, fig).
"""
import time as time_module
def log(msg):
if logger:
logger(str(msg))
else:
print(msg)
def update_progress(percent, message=None):
if progress_callback:
progress_callback(percent, message)
# === STEP 1: Circuit Generation (0-50%) ===
log("Step 1: Generating quantum circuits...")
update_progress(5, "Generating quantum circuits...")
# Ensure credentials are present (HF secrets) and make them discoverable
# by qiskit-ionq's default environment variable names.
ionq_token = _require_env("API_KEY_IONQ_QLBM", context="IonQ QLBM QPU execution")
os.environ.setdefault("IONQ_API_TOKEN", ionq_token)
provider = IonQProvider()
# backend = provider.get_backend("simulator")
backend = provider.get_backend("qpu.forte-enterprise-1")
# Use backend.name (property) instead of backend.name() (method) for Qiskit compatibility
backend_name = backend.name if isinstance(backend.name, str) else backend.name()
log(f"Selected IonQ backend: {backend_name}")
update_progress(15, f"Backend: {backend_name}")
qc_list=get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution,flag_qubits=flag_qubits,midcircuit_meas=False)
log(f"Generated {len(qc_list)} circuit(s) for timesteps {T_list}")
update_progress(45, f"Generated {len(qc_list)} circuits")
# Transpile circuits for IonQ with optimization_level=1 (recommended by IonQ)
log("Transpiling circuits for IonQ (optimization_level=1)...")
qc_list_transpiled = transpile(qc_list, backend=backend, optimization_level=1)
update_progress(48, "Circuits transpiled")
# === STEP 2: Job Submission (50%) ===
log("Submitting job to IonQ...")
update_progress(50, "Submitting job to IonQ...")
job = backend.run(qc_list_transpiled, shots=shots)
job_id = job.job_id() if hasattr(job, 'job_id') else str(job)
log(f"Job submitted! Job ID: {job_id}")
update_progress(52, f"Job submitted: {job_id}")
def get_job_result(j):
"""Poll job status and retrieve results with progress updates."""
log("Monitoring IonQ job status...")
update_progress(55, "Job queued, waiting for execution...")
# IonQ job status polling
last_status = None
poll_count = 0
max_polls = 60000 # ~10 minutes with 1s interval
while poll_count < max_polls:
try:
status = j.status()
status_name = status.name if hasattr(status, 'name') else str(status)
if status_name != last_status:
last_status = status_name
log(f"Job Status: {status_name}")
if status_name in ('QUEUED', 'VALIDATING'):
update_progress(58, f"Status: {status_name}")
elif status_name == 'RUNNING':
update_progress(70, "Job running on IonQ QPU...")
elif status_name == 'DONE':
update_progress(85, "Job completed!")
break
elif status_name in ('ERROR', 'CANCELLED'):
update_progress(85, f"Job {status_name.lower()}")
break
# Increment progress slightly while waiting
if status_name == 'QUEUED':
queue_progress = 58 + min(7, poll_count * 0.05)
update_progress(queue_progress, f"Queued... (waiting {poll_count}s)")
elif status_name == 'RUNNING':
run_progress = 70 + min(15, poll_count * 0.1)
update_progress(run_progress, f"Running on IonQ... ({poll_count}s)")
# Check if done
if status_name in ('DONE', 'ERROR', 'CANCELLED'):
break
time_module.sleep(1)
poll_count += 1
except Exception as e:
log(f"Status check error: {e}")
time_module.sleep(2)
poll_count += 2
log("Retrieving IonQ job results...")
update_progress(87, "Retrieving results...")
# === STEP 3: Creating Plots (90-100%) ===
update_progress(90, "Processing results...")
output=[]
total_timesteps = len(T_list)
for i, T_total in enumerate(T_list):
plot_progress = 90 + (i / total_timesteps) * 8 # 90-98%
update_progress(plot_progress, f"Processing timestep T={T_total}...")
counts = j.get_counts(i)
pts, counts = load_samples(counts, T_total, logger=None, flag_qubits=flag_qubits, midcircuit_meas=False)
output+=[estimate_density(pts, counts, bandwidth=0.05, grid_size=output_resolution)]
log(f"Processing complete: {len(output)} timestep(s)")
update_progress(98, "Creating visualization...")
fig = plot_density_isosurface_slider(output, T_list)
update_progress(100, "Complete!")
log("IonQ job completed successfully.")
return output, fig
return job, get_job_result
from qiskit_aer import AerSimulator
def run_sampling_sim(
n,
ux,
uy,
uz,
init_state_prep_circ,
T_list,
vel_resolution=32,
progress_callback=None,
):
"""
Run QLBM simulation on local Aer statevector simulator.
Parameters
----------
n : int
Number of qubits per spatial dimension
ux, uy, uz : callable or str
Velocity field components
init_state_prep_circ : QuantumCircuit
Pre-built initial state preparation circuit from get_named_init_state_circuit()
T_list : list[int]
List of timesteps to simulate
vel_resolution : int
Resolution for velocity field discretization
progress_callback : callable, optional
Function to report progress (0-100)
Returns
-------
output : list[ndarray]
List of 3D density arrays, one per timestep
fig : go.Figure
Plotly figure with slider animation through all timesteps (includes T=0 snapshot when available)
"""
# if type(ux)==str:
# ux,uy,uz=str_to_lambda(ux,uy,uz)
# # Convert string init_state_prep_circ to circuit if needed (matches original logic)
init_state_label = init_state_prep_circ if isinstance(init_state_prep_circ, str) else "custom"
# if isinstance(init_state_prep_circ, str):
# init_state_prep_circ=get_named_init_state_circuit(n,init_state_prep_circ)
initial_snapshot = None
try:
initial_snapshot = show_initial_distribution(
n=n,
init_state_name=str(init_state_label),
plot=False,
return_data=True,
init_state_circuit=init_state_prep_circ,
)
except Exception as exc:
print(f"Warning: Unable to compute initial distribution snapshot: {exc}")
qc_list=get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution,measure=False)
backend = AerSimulator(method = 'statevector')
output=[]
total_steps = len(qc_list)
for i, qc in enumerate(qc_list):
if progress_callback:
percent = int((i / total_steps) * 100)
progress_callback(percent)
qc_transpiled=qc
qc_transpiled.save_statevector(conditional=True)
# Try multiple shots to find a successful (zero-branch) outcome
max_attempts = 100
success = False
for attempt in range(max_attempts):
job = backend.run(qc_transpiled, memory=True, shots=1)
result = job.result()
data_all = result.data()
statevector_keys = list(dict(data_all['statevector']).keys())
# Look for the zero branch (0x0)
zero_key = None
for key in statevector_keys:
if '0x' in key:
if int(key[2:], 16) == 0:
zero_key = key
break
elif key == '0' or key == '00' or key.replace('0', '') == '':
zero_key = key
break
if zero_key is not None:
success = True
break
if attempt < max_attempts - 1:
print(f"Attempt {attempt + 1} failed (got branch {statevector_keys[0]}), retrying...")
if not success:
# If all attempts failed, use the first available branch with a warning
print(f"Warning: Could not get zero branch after {max_attempts} attempts. Using first available branch.")
zero_key = statevector_keys[0]
zero_branch_state = data_all['statevector'][zero_key]
sv_mean=np.mean(np.array(zero_branch_state)[:(2**n)**dim])
sv_phase=sv_mean/np.abs(sv_mean)
final_answer = np.real(np.array(zero_branch_state)[:(2**n)**dim]/sv_phase)
C = np.reshape(np.array(final_answer),tuple(2**n for _ in range(dim)))
output+=[C]
# Create meshgrid for coordinates (used for plotting)
x_coords = np.linspace(0, 1, 2**n)
X = np.meshgrid(x_coords, x_coords, x_coords, indexing='ij')
outputs_for_plot = output.copy()
times_for_plot = list(T_list)
if initial_snapshot is not None:
initial_density, _ = initial_snapshot
if times_for_plot and times_for_plot[0] == 0:
outputs_for_plot[0] = initial_density
else:
outputs_for_plot = [initial_density] + outputs_for_plot
times_for_plot = [0] + times_for_plot
# Create figure with slider for all timesteps (including T=0 if available)
fig = _create_slider_figure(outputs_for_plot, times_for_plot, X)
return output, fig
def _create_slider_figure(output_list, T_list, X):
"""
Create a Plotly figure with slider to animate through timesteps.
Uses visibility toggling instead of frames for better compatibility.
Parameters
----------
output_list : list[ndarray]
List of 3D density arrays from simulation
T_list : list[int]
List of timestep values
X : tuple of ndarrays
Meshgrid coordinates
Returns
-------
fig : go.Figure
Plotly figure with slider animation
"""
# Compute global min/max for consistent color scaling
global_min = min(np.min(C) for C in output_list)
global_max = max(np.max(C) for C in output_list)
fig = go.Figure()
# Add a trace for each timestep
for i, (C, T) in enumerate(zip(output_list, T_list)):
visible = (i == 0) # Only the first trace is visible initially
fig.add_trace(go.Isosurface(
x=X[2].flatten(),
y=X[1].flatten(),
z=X[0].flatten(),
value=C.flatten(),
isomin=global_min,
isomax=global_max,
opacity=0.4,
surface_count=10,
caps=dict(x_show=False, y_show=False, z_show=False),
colorscale=QLBM_PLOT_COLORSCALE,
colorbar=dict(title="Density"),
visible=visible,
name=f"T={T}"
))
# Create slider steps
steps = []
for i, T in enumerate(T_list):
# Create visibility array: only the i-th trace is True
step = dict(
method="update",
args=[{"visible": [False] * len(output_list)},
{"title": f"QLBM Simulation - Timestep T={T}"}],
label=str(T)
)
step["args"][0]["visible"][i] = True # Toggle i-th trace to True
steps.append(step)
sliders = [dict(
active=0,
currentvalue={"prefix": "Timestep: "},
pad={"t": 50},
steps=steps
)]
fig.update_layout(
title=f"QLBM Simulation - Timestep T={T_list[0]}",
scene=dict(
xaxis_title="X",
yaxis_title="Y",
zaxis_title="Z",
aspectmode='cube',
),
sliders=sliders
)
return fig
def show_initial_distribution(
n: int,
init_state_name: str = "sin",
# Sinusoidal parameters (frequency multipliers)
sine_k_x: float = 1.0,
sine_k_y: float = 1.0,
sine_k_z: float = 1.0,
# Gaussian parameters
gauss_cx: float = None,
gauss_cy: float = None,
gauss_cz: float = None,
gauss_sigma: float = None,
# Multi-dirac-delta parameters
mdd_kx_log2: int = 1,
mdd_ky_log2: int = 1,
mdd_kz_log2: int = 1,
# Display options
plot: bool = True,
return_data: bool = False,
init_state_circuit=None,
):
"""
Visualize the initial distribution by running the state preparation circuit
from get_named_init_state_circuit and extracting the resulting statevector.
Parameters
----------
n : int
Number of qubits per spatial dimension (grid size = 2^n per axis)
init_state_name : str
One of "dirac_delta", "sin", "gaussian", "multi_dirac_delta"
sine_k_x, sine_k_y, sine_k_z : float
Frequency multipliers for sinusoidal distribution (default=1.0)
gauss_cx, gauss_cy, gauss_cz : float
Center coordinates in [0,1] for Gaussian (default=0.5)
gauss_sigma : float
Spread of Gaussian in normalized units (default=0.2)
mdd_kx_log2, mdd_ky_log2, mdd_kz_log2 : int
log2 of frequency multipliers for multi-dirac-delta distribution (default=1)
plot : bool
Whether to display the 3D isosurface plot (default=True)
return_data : bool
Whether to return the distribution data (default=False)
init_state_circuit : QuantumCircuit, optional
Pre-built circuit to evaluate. When provided, ``init_state_name`` and the
associated parameters are ignored and the supplied circuit is simulated
directly.
Returns
-------
If return_data is True:
C : ndarray
3D array of shape (2^n, 2^n, 2^n) containing the initial distribution
X : tuple of ndarrays
Meshgrid coordinates (X[0], X[1], X[2]) for x, y, z axes
If return_data is False:
None
"""
N = 2**n
# Determine the state preparation circuit. Either use the provided circuit
# (useful when custom circuits are passed in) or construct one from the
# named presets for standalone previews.
if init_state_circuit is not None:
init_state_prep_circ = init_state_circuit.copy()
else:
init_state_prep_circ = get_named_init_state_circuit(
n,
init_state_name,
sine_k_x=sine_k_x,
sine_k_y=sine_k_y,
sine_k_z=sine_k_z,
gauss_cx=gauss_cx,
gauss_cy=gauss_cy,
gauss_cz=gauss_cz,
gauss_sigma=gauss_sigma,
mdd_kx_log2=mdd_kx_log2,
mdd_ky_log2=mdd_ky_log2,
mdd_kz_log2=mdd_kz_log2,
)
# Run the circuit on statevector simulator to extract the initial state
backend = AerSimulator(method='statevector')
# Create a copy of the circuit and save statevector
qc = init_state_prep_circ.copy()
qc.save_statevector()
job = backend.run(qc, shots=1)
result = job.result()
statevector = np.array(result.get_statevector())
# The statevector represents the initial distribution (amplitudes)
# Take the real part of the amplitudes (they should be real for these distributions)
init_state = np.real(statevector)
# Reshape to 3D grid
C = np.reshape(init_state, (N, N, N))
# Create meshgrid for coordinates
x_coords = np.linspace(0, 1, N)
X = np.meshgrid(x_coords, x_coords, x_coords, indexing='ij')
if plot:
print(f"Initial distribution: {init_state_name}")
print(f"Grid size: {N} x {N} x {N}")
if init_state_name == "sin":
print(f"Sine frequencies: kx={sine_k_x}, ky={sine_k_y}, kz={sine_k_z}")
elif init_state_name == "gaussian":
cx = float(gauss_cx) if gauss_cx is not None else 0.5
cy = float(gauss_cy) if gauss_cy is not None else 0.5
cz = float(gauss_cz) if gauss_cz is not None else 0.5
sigma = float(gauss_sigma) if gauss_sigma is not None else 0.2
print(f"Gaussian center: ({cx}, {cy}, {cz}), sigma={sigma}")
elif init_state_name == "multi_dirac_delta":
print(f"Multi-Dirac-Delta: kx_log2={mdd_kx_log2}, ky_log2={mdd_ky_log2}, kz_log2={mdd_kz_log2}")
print(f" Number of peaks: {2**mdd_kx_log2} x {2**mdd_ky_log2} x {2**mdd_kz_log2}")
print("Distribution stats:")
print(f" Min: {np.min(C):.6f}, Max: {np.max(C):.6f}")
print(f" Mean: {np.mean(C):.6f}, Std: {np.std(C):.6f}")
Cmax, Cmin = np.max(C.flatten()), np.min(C.flatten())
fig = go.Figure(data=go.Isosurface(
x=X[2].flatten(),
y=X[1].flatten(),
z=X[0].flatten(),
value=C.flatten(),
isomin=Cmin,
isomax=Cmax,
opacity=0.4,
surface_count=10,
caps=dict(x_show=False, y_show=False, z_show=False),
colorscale=QLBM_PLOT_COLORSCALE,
))
fig.update_layout(
title=f"Initial Distribution: {init_state_name}",
scene=dict(
xaxis_title="X",
yaxis_title="Y",
zaxis_title="Z",
),
)
fig.show()
if return_data:
return C, X
return None
if __name__=="__main__":
n=3
# Step 1: Create the initial state circuit ONCE with all parameters
# init_state_prep_circ = get_named_init_state_circuit(
# n=n,
# init_state_name="multi_dirac_delta", # or "gaussian", "dirac_delta"
# )
# # Alternative: Run on local simulator
# output, fig = run_sampling_sim(
# n=n,
# ux="sin(-2*pi*z)",
# uy="1",
# uz="sin(2*pi*x)",
# init_state_prep_circ="multi_dirac_delta",
# T_list=[1,3,5,7,9],
# vel_resolution=16
# )
# print(output)
# fig.show(renderer="browser")
# Step 2: (Optional) Preview the initial distribution
# show_initial_distribution(n=n, init_state_name="sin", sine_k_x=1, sine_k_y=1, sine_k_z=1)
# Step 3: Run simulation - pass the pre-built circuit
job, get_job_result = run_sampling_hw_ionq(
n=n,
ux="1",
uy="1",
uz="1",
init_state_prep_circ="multi_dirac_delta", # Pass the circuit directly
T_list=[1,2],
shots=2**15,
vel_resolution=2,
output_resolution=16
)
output,fig = get_job_result(job)
fig.show(renderer="browser")