Spaces:
Paused
Paused
| from qiskit import QuantumCircuit,QuantumRegister,ClassicalRegister,transpile | |
| from qiskit.synthesis.qft import synth_qft_full as QFT | |
| import numpy as np | |
| import os | |
| from sympy import sympify, symbols, lambdify | |
| from qiskit_ibm_runtime import QiskitRuntimeService | |
| import plotly.graph_objects as go | |
| dim=3 | |
| QLBM_PLOT_COLORSCALE = "Turbo" | |
| def bin_to_gray(bin_s): | |
| XOR=lambda x,y: (x or y) and not (x and y) | |
| gray_s=bin_s[0] | |
| for i in range(len(bin_s)-1): | |
| c_bool=XOR(bool(int(bin_s[i])),bool(int(bin_s[i+1]))) | |
| gray_s+=str(int(c_bool)) | |
| return gray_s | |
| def gray_to_bin(gray_s): | |
| XOR=lambda x,y: (x or y) and not (x and y) | |
| bin_s=gray_s[0] | |
| for i in range(len(gray_s)-1): | |
| c_bool=XOR(bool(int(bin_s[i])),bool(int(gray_s[i+1]))) | |
| bin_s+=str(int(c_bool)) | |
| return bin_s | |
| def bin_to_int(bin_s): | |
| return int(bin_s,2) | |
| def int_to_bin(i,pad): | |
| return bin(i)[2:].zfill(pad) | |
| def fwht_approx(f,N,num_points_per_dim,threshold=1e-10): | |
| linear_block_size=int(N//num_points_per_dim) | |
| num_angles_per_block=int(np.log2(linear_block_size)) | |
| thetas={} | |
| for k in range(num_points_per_dim): | |
| for j in range(num_points_per_dim): | |
| for i in range(num_points_per_dim): | |
| avg_f=2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size+(linear_block_size-1)/2)) | |
| thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size]=avg_f | |
| slope_x=(2*np.arccos(f(i*linear_block_size,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size+(linear_block_size-1)/2))-2*np.arccos(f(((i+1)%N)*linear_block_size,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size+(linear_block_size-1)/2)))/linear_block_size | |
| slope_y=(2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size,k*linear_block_size+(linear_block_size-1)/2))-2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,((j+1)%N)*linear_block_size,k*linear_block_size+(linear_block_size-1)/2)))/linear_block_size | |
| slope_z=(2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size+(linear_block_size-1)/2,k*linear_block_size))-2*np.arccos(f(i*linear_block_size+(linear_block_size-1)/2,j*linear_block_size+(linear_block_size-1)/2,((k+1)%N)*linear_block_size)))/linear_block_size | |
| for m in range(num_angles_per_block): | |
| thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size + 2**m]=slope_x*(2**(m-1)) | |
| thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size + N*(2**m)]=slope_y*(2**(m-1)) | |
| thetas[k*(N**2)*linear_block_size+j*N*linear_block_size+i*linear_block_size + (N**2)*(2**m)]=slope_z*(2**(m-1)) | |
| h = linear_block_size | |
| while h < N**3: | |
| for i in range(0, N**3, h * 2): | |
| if (i//N)%linear_block_size!=0: | |
| continue | |
| if (i//(N**2))%linear_block_size!=0: | |
| continue | |
| j=i | |
| while j<i+h: | |
| index=j | |
| x = thetas[index] | |
| y = thetas[index + h] | |
| thetas[index] = (x + y)/2 | |
| thetas[index + h] = (x - y)/2 | |
| for ax in range(3): | |
| for m in range(num_angles_per_block): | |
| index=j+(N**ax)*(2**m) | |
| x = thetas[index] | |
| y = thetas[index + h] | |
| thetas[index] = (x + y)/2 | |
| thetas[index + h] = (x - y)/2 | |
| j+=linear_block_size | |
| if (j//N)%linear_block_size==1: | |
| j+=(linear_block_size-1)*N | |
| if (j//(N**2))%linear_block_size==1: | |
| j+=(linear_block_size-1)*(N**2) | |
| h *= 2 | |
| if h==N: | |
| h=N*linear_block_size | |
| if h==N**2: | |
| h=(N**2)*linear_block_size | |
| theta_sorted=sorted(np.abs(list(thetas.values()))) | |
| sum_=0 | |
| for th in theta_sorted: | |
| sum_+=th | |
| if sum_>threshold: | |
| threshold=sum_-th | |
| break | |
| return [theta for theta in thetas.values() if abs(theta)>threshold],[key for key in thetas.keys() if abs(thetas[key])>threshold] | |
| def get_circuit_inputs(f,num_reg_qubits,num_points_per_dim): | |
| theta_vec,indices=fwht_approx(f,2**num_reg_qubits,num_points_per_dim,1e-4) | |
| circ_pos=[] | |
| for ind in indices: | |
| circ_pos+=[bin_to_int(gray_to_bin(int_to_bin(ind,num_reg_qubits*3)))] | |
| sorted_theta_vec=sorted(zip(theta_vec,circ_pos),key=lambda el:el[1]) | |
| ctrls=[] | |
| current_bs="0"*(3*num_reg_qubits) | |
| for el in sorted_theta_vec: | |
| new_bs=bin_to_gray(int_to_bin((el[1])%(2**(3*num_reg_qubits)),(3*num_reg_qubits))) | |
| ctrls += [[i for i, (char1, char2) in enumerate(zip(current_bs[::-1], new_bs[::-1])) if char1 != char2]] | |
| current_bs=new_bs | |
| new_bs="0"*(3*num_reg_qubits) | |
| ctrls += [[i for i, (char1, char2) in enumerate(zip(current_bs[::-1], new_bs[::-1])) if char1 != char2]] | |
| return [el[0] for el in sorted_theta_vec],ctrls | |
| def get_coeffs(n,ux,uy,uz,resolution=32): | |
| current_N=2**n | |
| x_coeffs,x_coeff_var_indices=get_circuit_inputs(lambda x,y,z: ((1+ux(x/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution)) | |
| y_coeffs,y_coeff_var_indices=get_circuit_inputs(lambda x,y,z: ((1+uy(x/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution)) | |
| z_coeffs,z_coeff_var_indices=get_circuit_inputs(lambda x,y,z: ((1+uz(x/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution)) | |
| x_coeffs_,x_coeff_var_indices_=get_circuit_inputs(lambda x,y,z: 0 if (1+ux((x-1)/current_N,y/current_N,z/current_N))==0 else \ | |
| ((1+ux((x-1)/current_N,y/current_N,z/current_N))/(2+ux((x-1)/current_N,y/current_N,z/current_N)-ux((x+1)/current_N,y/current_N,z/current_N)))**0.5,n,min(current_N,resolution)) | |
| y_coeffs_,y_coeff_var_indices_=get_circuit_inputs(lambda x,y,z: 0 if (1+uy(x/current_N,(y-1)/current_N,z/current_N))==0 else \ | |
| ((1+uy(x/current_N,(y-1)/current_N,z/current_N))/(2+uy(x/current_N,(y-1)/current_N,z/current_N)-uy(x/current_N,(y+1)/current_N,z/current_N)))**0.5,n,min(current_N,resolution)) | |
| z_coeffs_,z_coeff_var_indices_=get_circuit_inputs(lambda x,y,z: 0 if (1+uz(x/current_N,y/current_N,(z-1)/current_N))==0 else \ | |
| ((1+uz(x/current_N,y/current_N,(z-1)/current_N))/(2+uz(x/current_N,y/current_N,(z-1)/current_N)-uz(x/current_N,y/current_N,(z+1)/current_N)))**0.5,n,min(current_N,resolution)) | |
| unprep1_coeffs,unprep1_coeff_var_indices=get_circuit_inputs(lambda x,y,z:\ | |
| (1/3**0.5)*(1+(ux((x-1)/current_N,y/current_N,z/current_N)-ux((x+1)/current_N,y/current_N,z/current_N))/2)**0.5,n,min(current_N,resolution)) | |
| unprep2_coeffs,unprep2_coeff_var_indices=get_circuit_inputs(lambda x,y,z:\ | |
| ((1+(uy(x/current_N,(y-1)/current_N,z/current_N)-uy(x/current_N,(y+1)/current_N,z/current_N))/2)/(2-(ux((x-1)/current_N,y/current_N,z/current_N)-ux((x+1)/current_N,y/current_N,z/current_N))/2))**0.5,n,min(current_N,resolution)) | |
| return x_coeffs,x_coeff_var_indices, y_coeffs,y_coeff_var_indices, z_coeffs,z_coeff_var_indices,\ | |
| x_coeffs_,x_coeff_var_indices_, y_coeffs_,y_coeff_var_indices_, z_coeffs_,z_coeff_var_indices_,\ | |
| unprep1_coeffs,unprep1_coeff_var_indices, unprep2_coeffs,unprep2_coeff_var_indices | |
| def get_coll_ops(n,ux,uy,uz,resolution=32): | |
| x_coeffs,x_coeff_var_indices, y_coeffs,y_coeff_var_indices, z_coeffs,z_coeff_var_indices,\ | |
| x_coeffs_,x_coeff_var_indices_, y_coeffs_,y_coeff_var_indices_, z_coeffs_,z_coeff_var_indices_,\ | |
| unprep1_coeffs,unprep1_coeff_var_indices, unprep2_coeffs,unprep2_coeff_var_indices = get_coeffs(n,ux,uy,uz,resolution) | |
| def prep(qc,pos_qr,dir_qr): | |
| qc.h(dir_qr[0]) | |
| qc.h(dir_qr[4]) | |
| qc.cx(dir_qr[0],dir_qr[2]) | |
| qc.ry(-np.pi/4,dir_qr[4]) | |
| qc.cx(dir_qr[2],dir_qr[4]) | |
| qc.ry(np.pi/4,dir_qr[4]) | |
| qc.cx(dir_qr[2],dir_qr[4]) | |
| qc.ry(-np.pi/4,dir_qr[2]) | |
| qc.cx(dir_qr[0],dir_qr[2]) | |
| qc.ry(np.pi/4,dir_qr[2]) | |
| qc.cx(dir_qr[0],dir_qr[2]) | |
| qc.cx(dir_qr[2],dir_qr[0]) | |
| qc.cx(dir_qr[0],dir_qr[1]) | |
| for i in range(len(x_coeff_var_indices)): | |
| for ind in x_coeff_var_indices[i]: | |
| qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[0]) | |
| if i<len(x_coeffs): | |
| qc.cry(x_coeffs[i],dir_qr[1],dir_qr[0]) | |
| qc.cx(dir_qr[0],dir_qr[1]) | |
| qc.cx(dir_qr[2],dir_qr[3]) | |
| for i in range(len(y_coeff_var_indices)): | |
| for ind in y_coeff_var_indices[i]: | |
| qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[2]) | |
| if i<len(y_coeffs): | |
| qc.cry(y_coeffs[i],dir_qr[3],dir_qr[2]) | |
| qc.cx(dir_qr[2],dir_qr[3]) | |
| qc.cx(dir_qr[4],dir_qr[5]) | |
| for i in range(len(z_coeff_var_indices)): | |
| for ind in z_coeff_var_indices[i]: | |
| qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[4]) | |
| if i<len(z_coeffs): | |
| qc.cry(z_coeffs[i],dir_qr[5],dir_qr[4]) | |
| qc.cx(dir_qr[4],dir_qr[5]) | |
| def unprep(qc,pos_qr,dir_qr): | |
| qc.cx(dir_qr[0],dir_qr[1]) | |
| for i in range(len(x_coeff_var_indices_)): | |
| for ind in x_coeff_var_indices_[i]: | |
| qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[0]) | |
| if i<len(x_coeffs_): | |
| qc.cry(-x_coeffs_[i],dir_qr[1],dir_qr[0]) | |
| qc.cx(dir_qr[0],dir_qr[1]) | |
| qc.cx(dir_qr[2],dir_qr[3]) | |
| for i in range(len(y_coeff_var_indices_)): | |
| for ind in y_coeff_var_indices_[i]: | |
| qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[2]) | |
| if i<len(y_coeffs_): | |
| qc.cry(-y_coeffs_[i],dir_qr[3],dir_qr[2]) | |
| qc.cx(dir_qr[2],dir_qr[3]) | |
| qc.cx(dir_qr[4],dir_qr[5]) | |
| for i in range(len(z_coeff_var_indices_)): | |
| for ind in z_coeff_var_indices_[i]: | |
| qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[4]) | |
| if i<len(z_coeffs_): | |
| qc.cry(-z_coeffs_[i],dir_qr[5],dir_qr[4]) | |
| qc.cx(dir_qr[4],dir_qr[5]) | |
| qc.cx(dir_qr[2],dir_qr[4]) | |
| for i in range(len(unprep2_coeff_var_indices)): | |
| for ind in unprep2_coeff_var_indices[i]: | |
| qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[2]) | |
| if i<len(unprep2_coeffs): | |
| qc.cry(unprep2_coeffs[i],dir_qr[4],dir_qr[2]) | |
| qc.cx(dir_qr[2],dir_qr[4]) | |
| qc.cx(dir_qr[0],dir_qr[2]) | |
| for i in range(len(unprep1_coeff_var_indices)): | |
| for ind in unprep1_coeff_var_indices[i]: | |
| qc.cx([q for reg in pos_qr for q in reg][ind],dir_qr[0]) | |
| if i<len(unprep1_coeffs): | |
| qc.cry(unprep1_coeffs[i],dir_qr[2],dir_qr[0]) | |
| qc.cx(dir_qr[0],dir_qr[2]) | |
| qc.ry(-2*np.pi/3,dir_qr[0]) | |
| return prep,unprep | |
| def stream(qc,pos_qr,dir_qr,n): | |
| for i in range(dim): | |
| forw_ctrl=dir_qr[2*i] | |
| backw_ctrl=dir_qr[2*i+1] | |
| for m in range(n): | |
| qc.cp( np.pi / (2 ** m), forw_ctrl, pos_qr[i][m]) | |
| qc.cp(-np.pi / (2 ** m), backw_ctrl, pos_qr[i][m]) | |
| def get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution=32,measure=True,flag_qubits=False,midcircuit_meas=True): | |
| ux_str,uy_str,uz_str=None,None,None | |
| if type(ux)==str: | |
| ux_str,uy_str,uz_str=ux,uy,uz | |
| ux,uy,uz=str_to_lambda(ux_str,uy_str,uz_str) | |
| if type(init_state_prep_circ)==str: | |
| init_state_prep_circ=get_named_init_state_circuit(n,init_state_prep_circ) | |
| dirs=[[0,0,0],[1,0,0],[-1,0,0],[0,1,0],[0,-1,0],[0,0,1],[0,0,-1]] | |
| wts = np.array([2/8, 1/8, 1/8, 1/8, 1/8, 1/8, 1/8]) | |
| dir_indices=["".join(["0"+str(el) if el>=0 else str(-el)+"0" for el in dir_[::-1]]) for dir_ in dirs] | |
| dirs_state=np.zeros(2**7) | |
| for i,dir_ind in enumerate(dir_indices): | |
| ind=int(dir_ind,2) | |
| dirs_state[ind]=wts[i]**0.5 | |
| qc_list=[] | |
| prep, unprep=get_coll_ops(n,ux,uy,uz,vel_resolution) | |
| for T_total in T_list: | |
| pos_qr=[QuantumRegister(n) for _ in range(dim)] | |
| pos_cr=[ClassicalRegister(n) for _ in range(dim)] | |
| if midcircuit_meas: | |
| dir_qr=QuantumRegister(2*dim) | |
| else: | |
| dir_qr_list=[QuantumRegister(2*dim) for _ in range(T_total)] | |
| dir_qr_flag=QuantumRegister(2*dim) | |
| dir_cr=[ClassicalRegister((4 if flag_qubits and midcircuit_meas else 2)*dim) for _ in range(T_total+int(flag_qubits and not midcircuit_meas))] | |
| if flag_qubits: | |
| if midcircuit_meas: | |
| qc=QuantumCircuit(*pos_qr,dir_qr,dir_qr_flag,*pos_cr,*dir_cr) | |
| else: | |
| qc=QuantumCircuit(*pos_qr,*dir_qr_list,dir_qr_flag,*pos_cr,*dir_cr) | |
| else: | |
| if midcircuit_meas: | |
| qc=QuantumCircuit(*pos_qr,dir_qr,*pos_cr,*dir_cr) | |
| else: | |
| qc=QuantumCircuit(*pos_qr,*dir_qr_list,*pos_cr,*dir_cr) | |
| qc.compose(init_state_prep_circ,[qubit for qr in pos_qr for qubit in list(qr)], inplace=True) | |
| uniform_bool=False | |
| if ux_str is not None: | |
| if 'x' not in ux_str+uy_str+uz_str and 'y' not in ux_str+uy_str+uz_str and 'z' not in ux_str+uy_str+uz_str: | |
| uniform_bool=True | |
| if uniform_bool: | |
| for i in range(dim): | |
| qc.compose(QFT(n, inverse=False, do_swaps=False), pos_qr[i], inplace=True) | |
| for T in list(range(T_total))[::-1]: | |
| if not midcircuit_meas: | |
| dir_qr=dir_qr_list[T] | |
| prep(qc,pos_qr,dir_qr) | |
| if flag_qubits: | |
| for q1,q2 in zip(dir_qr,dir_qr_flag): | |
| qc.cx(q1,q2) | |
| if not uniform_bool: | |
| for i in range(dim): | |
| qc.compose(QFT(n, inverse=False, do_swaps=False), pos_qr[i], inplace=True) | |
| stream(qc,pos_qr,dir_qr,n) | |
| if not uniform_bool: | |
| for i in range(dim): | |
| qc.compose(QFT(n, inverse=True, do_swaps=False), pos_qr[i], inplace=True) | |
| if flag_qubits: | |
| for q1,q2 in zip(dir_qr,dir_qr_flag): | |
| qc.cx(q1,q2) | |
| unprep(qc,pos_qr,dir_qr) | |
| if flag_qubits and midcircuit_meas: | |
| qc.measure(list(dir_qr)+list(dir_qr_flag),dir_cr[T]) | |
| else: | |
| qc.measure(dir_qr,dir_cr[T]) | |
| if not midcircuit_meas and flag_qubits: | |
| qc.measure(dir_qr_flag,dir_cr[T_total]) | |
| if uniform_bool: | |
| for i in range(dim): | |
| qc.compose(QFT(n, inverse=True, do_swaps=False), pos_qr[i], inplace=True) | |
| if measure: | |
| for i in range(dim): | |
| qc.measure(pos_qr[i],pos_cr[i]) | |
| qc_list+=[qc] | |
| return qc_list | |
| def str_to_lambda(vx_param,vy_param,vz_param): | |
| vx_val = str(vx_param) | |
| vy_val = str(vy_param) | |
| vz_val = str(vz_param) | |
| x_sym, y_sym, z_sym = symbols('x y z') | |
| vx_sympified = sympify(vx_val) | |
| vy_sympified = sympify(vy_val) | |
| vz_sympified = sympify(vz_val) | |
| vx=lambdify((x_sym, y_sym, z_sym), vx_sympified, modules="numpy") | |
| vy=lambdify((x_sym, y_sym, z_sym), vy_sympified, modules="numpy") | |
| vz=lambdify((x_sym, y_sym, z_sym), vz_sympified, modules="numpy") | |
| return vx,vy,vz | |
| def get_named_init_state_circuit( | |
| n: int, | |
| init_state_name: str, | |
| # Sinusoidal parameters (frequency multipliers) | |
| sine_k_x: float = 1.0, | |
| sine_k_y: float = 1.0, | |
| sine_k_z: float = 1.0, | |
| # Gaussian parameters | |
| gauss_cx: float = None, # Center X (0-1 normalized), defaults to 0.5 | |
| gauss_cy: float = None, # Center Y (0-1 normalized), defaults to 0.5 | |
| gauss_cz: float = None, # Center Z (0-1 normalized), defaults to 0.5 | |
| gauss_sigma: float = None, # Spread, defaults to 0.2 in normalized units | |
| # Multi-dirac-delta parameters | |
| mdd_kx_log2: int = 1, # Integer greater than >=1. Number of dirac-deltas along x is 2^mdd_kx_log2 | |
| mdd_ky_log2: int = 1, # Integer greater than >=1. Number of dirac-deltas along y is 2^mdd_ky_log2 | |
| mdd_kz_log2: int = 1 # Integer greater than >=1. Number of dirac-deltas along z is 2^mdd_kz_log2 | |
| ): | |
| """ | |
| Create initial state preparation circuit with configurable parameters. | |
| Parameters | |
| ---------- | |
| n : int | |
| Number of qubits per spatial dimension (grid size = 2^n per axis) | |
| init_state_name : str | |
| One of "dirac_delta", "sin", "gaussian" | |
| sine_k_x, sine_k_y, sine_k_z : float | |
| Frequency multipliers for sinusoidal distribution (default=1.0) | |
| gauss_cx, gauss_cy, gauss_cz : float | |
| Center coordinates in [0,1] for Gaussian (default=0.5) | |
| gauss_sigma : float | |
| Spread of Gaussian in normalized units (default=0.2) | |
| mdd_kx_log2, mdd_ky_log2, mdd_kz_log2 : int | |
| log2 of frequency multipliers for dirac-delta array distribution (default=1) | |
| Returns | |
| ------- | |
| QuantumCircuit | |
| State preparation circuit | |
| """ | |
| N = 2**n | |
| init_state_prep_circ = QuantumCircuit(3*n) | |
| if init_state_name == "dirac_delta": | |
| init_state_prep_circ.x(n-1) | |
| init_state_prep_circ.x(2*n-1) | |
| init_state_prep_circ.x(3*n-1) | |
| elif init_state_name == "multi_dirac_delta": | |
| init_state_prep_circ.h(range(n-mdd_kx_log2,n)) | |
| init_state_prep_circ.x(n-1-mdd_kx_log2) | |
| init_state_prep_circ.h(range(2*n-mdd_ky_log2,2*n)) | |
| init_state_prep_circ.x(2*n-1-mdd_ky_log2) | |
| init_state_prep_circ.h(range(3*n-mdd_kz_log2,3*n)) | |
| init_state_prep_circ.x(3*n-1-mdd_kz_log2) | |
| elif init_state_name == "sin": | |
| # Configurable frequency sinusoidal distribution | |
| # f(x,y,z) = 1 + sin(2π * kx * x) * sin(2π * ky * y) * sin(2π * kz * z) | |
| kx = max(1, int(round(float(sine_k_x)))) | |
| ky = max(1, int(round(float(sine_k_y)))) | |
| kz = max(1, int(round(float(sine_k_z)))) | |
| coords = np.arange(N) / N # Normalized [0, 1) | |
| sin_x = np.sin(2 * np.pi * kx * coords) | |
| sin_y = np.sin(2 * np.pi * ky * coords) | |
| sin_z = np.sin(2 * np.pi * kz * coords) | |
| # Build 3D state via Kronecker products | |
| # Order matches original: z ⊗ (y ⊗ x) | |
| init_state = 1 + ( | |
| np.kron(sin_z, np.ones(N**2)) * | |
| np.kron(np.ones(N**2), sin_x) * | |
| np.kron(np.ones(N), np.kron(sin_y, np.ones(N))) | |
| ) | |
| init_state_prep_circ.prepare_state(init_state.astype(np.complex128), normalize=True) | |
| init_state_prep_circ = transpile(init_state_prep_circ, basis_gates=['u1', 'u2', 'u3', 'cx']) | |
| elif init_state_name == "gaussian": | |
| # Configurable Gaussian distribution | |
| # f(x,y,z) = exp(-((x-cx)^2 + (y-cy)^2 + (z-cz)^2) / (2*sigma^2)) | |
| # Default centers to 0.5 (middle of domain) - matches original mu=0.5 | |
| cx = float(gauss_cx) if gauss_cx is not None else 0.5 | |
| cy = float(gauss_cy) if gauss_cy is not None else 0.5 | |
| cz = float(gauss_cz) if gauss_cz is not None else 0.5 | |
| # Default sigma to 1.0 to match original sig=1 behavior | |
| # Original formula: exp(-((x - mu) / sig)^2 / 2) with sig=1 | |
| # Our formula: exp(-((x - cx)^2) / (2 * sigma^2)) | |
| # For equivalence: sigma = 1.0 (they are the same formula) | |
| sigma = float(gauss_sigma) if gauss_sigma is not None else 1.0 | |
| coords = np.arange(N) / N # Normalized [0, 1) | |
| gauss_x = np.exp(-((coords - cx)**2) / (2 * sigma**2)) | |
| gauss_y = np.exp(-((coords - cy)**2) / (2 * sigma**2)) | |
| gauss_z = np.exp(-((coords - cz)**2) / (2 * sigma**2)) | |
| # Build 3D state via Kronecker products (same order as original) | |
| init_state = ( | |
| np.kron(gauss_z, np.ones(N**2)) * | |
| np.kron(np.ones(N**2), gauss_x) * | |
| np.kron(np.ones(N), np.kron(gauss_y, np.ones(N))) | |
| ) | |
| init_state_prep_circ.prepare_state(init_state.astype(np.complex128), normalize=True) | |
| init_state_prep_circ = transpile(init_state_prep_circ, basis_gates=['u1', 'u2', 'u3', 'cx']) | |
| return init_state_prep_circ | |
| ########################################################################################## | |
| from qiskit_ibm_runtime import QiskitRuntimeService, SamplerV2 as Sampler | |
| from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager | |
| import pprint | |
| # import mthree | |
| try: | |
| # Try relative import first (best for package usage) | |
| from .visualize_counts import load_samples, estimate_density, plot_density_isosurface, plot_density_isosurface_slider | |
| except ImportError: | |
| try: | |
| # Try absolute import with package prefix | |
| from qlbm.visualize_counts import load_samples, estimate_density, plot_density_isosurface, plot_density_isosurface_slider | |
| except ImportError: | |
| # Fallback to direct import (for script usage) | |
| from visualize_counts import load_samples, estimate_density, plot_density_isosurface, plot_density_isosurface_slider | |
| def run_sampling_hw_ibm( | |
| n, | |
| ux, | |
| uy, | |
| uz, | |
| init_state_prep_circ, | |
| T_list, | |
| shots=2**14, | |
| vel_resolution=32, | |
| output_resolution=40, | |
| logger=None, | |
| flag_qubits=True, | |
| progress_callback=None, | |
| ): | |
| """ | |
| Run QLBM simulation on IBM quantum hardware. | |
| Parameters | |
| ---------- | |
| n : int | |
| Number of qubits per spatial dimension | |
| ux, uy, uz : callable or str | |
| Velocity field components | |
| init_state_prep_circ : QuantumCircuit | |
| Pre-built initial state preparation circuit from get_named_init_state_circuit() | |
| T_list : list[int] | |
| List of timesteps to simulate | |
| shots : int | |
| Number of measurement shots (default: 2^19) | |
| vel_resolution : int | |
| Resolution for velocity field discretization | |
| output_resolution : int | |
| Grid resolution for density estimation output | |
| logger : callable, optional | |
| Function to log messages (e.g. print to console) | |
| progress_callback : callable, optional | |
| Function to report progress (0-100) with optional status message: progress_callback(percent, message) | |
| Returns | |
| ------- | |
| job : IBMJob | |
| The submitted job object | |
| get_job_result : callable | |
| Callback function to retrieve and process results. Returns (output, fig). | |
| """ | |
| import time as time_module | |
| def log(msg): | |
| if logger: | |
| logger(str(msg)) | |
| else: | |
| print(msg) | |
| def update_progress(percent, message=None): | |
| if progress_callback: | |
| progress_callback(percent, message) | |
| # === STEP 1: Circuit Generation (0-50%) === | |
| log("Step 1: Generating quantum circuits...") | |
| update_progress(5, "Generating quantum circuits...") | |
| qc_list=get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution,flag_qubits=flag_qubits) | |
| log(f"Generated {len(qc_list)} circuit(s) for timesteps {T_list}") | |
| update_progress(15, f"Generated {len(qc_list)} circuits") | |
| pm_optimization_level = 3 | |
| log("Connecting to IBM Quantum service...") | |
| update_progress(20, "Connecting to IBM Quantum...") | |
| ibm_token = _require_env("API_KEY_IBM_QLBM", context="IBM QLBM QPU execution") | |
| service = QiskitRuntimeService( | |
| channel="ibm_cloud", | |
| token=ibm_token, | |
| instance="crn:v1:bluemix:public:quantum-computing:us-east:a/15157e4350c04a9dab51b8b8a4a93c86:e29afd91-64bf-4a82-8dbf-731e6c213595::", | |
| ) | |
| backend = service.least_busy() | |
| log(f"Selected backend: {backend.name}") | |
| update_progress(25, f"Backend: {backend.name}") | |
| qc_compiled_list=[] | |
| total_circuits = len(qc_list) | |
| for idx, qc in enumerate(qc_list): | |
| circuit_progress = 25 + (idx / total_circuits) * 20 # 25-45% | |
| update_progress(circuit_progress, f"Transpiling circuit {idx+1}/{total_circuits}...") | |
| pm = generate_preset_pass_manager(backend=backend, optimization_level=pm_optimization_level) | |
| log(f"Transpiling circuit {idx+1}/{total_circuits} via PassManager...") | |
| qc_compiled = pm.run(qc) | |
| log(f" Compiled: {qc_compiled.num_qubits} qubits, {qc_compiled.num_clbits} clbits, depth={qc_compiled.depth()}") | |
| qc_compiled_list+=[qc_compiled] | |
| log("All circuits transpiled successfully.") | |
| update_progress(50, "Circuits ready. Submitting job...") | |
| # === STEP 2: Job Submission & Monitoring (50-90%) === | |
| sampler = Sampler(mode=backend) | |
| job = sampler.run(qc_compiled_list, shots=shots) | |
| job_id = job.job_id() if hasattr(job, 'job_id') else str(job) | |
| log(f"Job submitted! Job ID: {job_id}") | |
| update_progress(52, f"Job submitted: {job_id}") | |
| def get_job_result(j): | |
| """Poll job status and retrieve results with progress updates.""" | |
| # Job status polling with progress updates | |
| log("Monitoring job status...") | |
| update_progress(55, "Job queued, waiting for execution...") | |
| # Status mapping for progress estimation | |
| # JobStatus: INITIALIZING, QUEUED, VALIDATING, RUNNING, CANCELLED, DONE, ERROR | |
| status_progress_map = { | |
| 'INITIALIZING': (55, "Job initializing..."), | |
| 'QUEUED': (58, "Job queued, waiting..."), | |
| 'VALIDATING': (62, "Validating job..."), | |
| 'RUNNING': (70, "Job running on QPU..."), | |
| 'DONE': (85, "Job completed!"), | |
| 'ERROR': (85, "Job error occurred"), | |
| 'CANCELLED': (85, "Job cancelled"), | |
| } | |
| last_status = None | |
| poll_count = 0 | |
| max_polls = 600 # ~10 minutes with 1s interval | |
| while poll_count < max_polls: | |
| try: | |
| status = j.status() | |
| status_name = status.name if hasattr(status, 'name') else str(status) | |
| if status_name != last_status: | |
| last_status = status_name | |
| progress_val, status_msg = status_progress_map.get(status_name, (60, f"Status: {status_name}")) | |
| log(f"Job Status: {status_name}") | |
| update_progress(progress_val, status_msg) | |
| # Check if job is complete | |
| if status_name in ('DONE', 'ERROR', 'CANCELLED'): | |
| break | |
| # Increment progress slightly while waiting (indeterminate feel) | |
| if status_name == 'QUEUED': | |
| # Slowly increment between 58-65 while queued | |
| queue_progress = 58 + min(7, poll_count * 0.05) | |
| update_progress(queue_progress, f"Queued... (waiting {poll_count}s)") | |
| elif status_name == 'RUNNING': | |
| # Slowly increment between 70-85 while running | |
| run_progress = 70 + min(15, poll_count * 0.1) | |
| update_progress(run_progress, f"Running on QPU... ({poll_count}s)") | |
| time_module.sleep(1) | |
| poll_count += 1 | |
| except Exception as e: | |
| log(f"Status check error: {e}") | |
| time_module.sleep(2) | |
| poll_count += 2 | |
| # Get results | |
| log("Retrieving job results...") | |
| update_progress(87, "Retrieving results...") | |
| result = j.result() | |
| log("Results retrieved successfully.") | |
| update_progress(90, "Processing results...") | |
| # === STEP 3: Creating Plots (90-100%) === | |
| output=[] | |
| total_timesteps = len(T_list) | |
| for idx, (T_total, pub) in enumerate(zip(T_list, result)): | |
| plot_progress = 90 + (idx / total_timesteps) * 8 # 90-98% | |
| update_progress(plot_progress, f"Processing timestep T={T_total}...") | |
| try: | |
| joined = pub.join_data() | |
| joined_counts = joined.get_counts() | |
| except Exception as e: | |
| log(f"Error retrieving counts for T={T_total}: {e}") | |
| joined_counts = None | |
| pts, counts = load_samples(joined_counts, T_total, logger=None, flag_qubits=flag_qubits) | |
| output+=[estimate_density(pts, counts, bandwidth=0.05, grid_size=output_resolution)] | |
| log(f"Processing complete: {len(output)} timestep(s)") | |
| update_progress(98, "Creating visualization...") | |
| fig = plot_density_isosurface_slider(output, T_list) | |
| update_progress(100, "Complete!") | |
| log("IBM QPU job completed successfully.") | |
| return output, fig | |
| return job, get_job_result | |
| from qiskit_ionq import IonQProvider | |
| class MissingCredentialError(RuntimeError): | |
| """Raised when a required API key/secret is not available at runtime.""" | |
| def _require_env(var_name: str, *, context: str) -> str: | |
| """Return a required environment variable or raise a helpful error.""" | |
| value = os.environ.get(var_name) | |
| if value is None or str(value).strip() == "": | |
| raise MissingCredentialError( | |
| f"Missing required secret '{var_name}'. " | |
| f"Set it as a runtime environment variable (e.g., Hugging Face Space → Settings → Secrets) " | |
| f"before running {context}." | |
| ) | |
| return value | |
| def run_sampling_hw_ionq( | |
| n, | |
| ux, | |
| uy, | |
| uz, | |
| init_state_prep_circ, | |
| T_list, | |
| shots=2**14, | |
| vel_resolution=32, | |
| output_resolution=40, | |
| logger=None, | |
| flag_qubits=True, | |
| progress_callback=None, | |
| ): | |
| """ | |
| Run QLBM simulation on IonQ quantum hardware. | |
| Parameters | |
| ---------- | |
| n : int | |
| Number of qubits per spatial dimension | |
| ux, uy, uz : callable or str | |
| Velocity field components | |
| init_state_prep_circ : QuantumCircuit | |
| Pre-built initial state preparation circuit from get_named_init_state_circuit() | |
| T_list : list[int] | |
| List of timesteps to simulate | |
| shots : int | |
| Number of measurement shots (default: 2^19) | |
| vel_resolution : int | |
| Resolution for velocity field discretization | |
| output_resolution : int | |
| Grid resolution for density estimation output | |
| logger : callable, optional | |
| Function to log messages (e.g. print to console) | |
| progress_callback : callable, optional | |
| Function to report progress (0-100) with optional status message: progress_callback(percent, message) | |
| Returns | |
| ------- | |
| job : IonQ Job | |
| The submitted job object | |
| get_job_result : callable | |
| Callback function to retrieve and process results. Returns (output, fig). | |
| """ | |
| import time as time_module | |
| def log(msg): | |
| if logger: | |
| logger(str(msg)) | |
| else: | |
| print(msg) | |
| def update_progress(percent, message=None): | |
| if progress_callback: | |
| progress_callback(percent, message) | |
| # === STEP 1: Circuit Generation (0-50%) === | |
| log("Step 1: Generating quantum circuits...") | |
| update_progress(5, "Generating quantum circuits...") | |
| # Ensure credentials are present (HF secrets) and make them discoverable | |
| # by qiskit-ionq's default environment variable names. | |
| ionq_token = _require_env("API_KEY_IONQ_QLBM", context="IonQ QLBM QPU execution") | |
| os.environ.setdefault("IONQ_API_TOKEN", ionq_token) | |
| provider = IonQProvider() | |
| # backend = provider.get_backend("simulator") | |
| backend = provider.get_backend("qpu.forte-enterprise-1") | |
| # Use backend.name (property) instead of backend.name() (method) for Qiskit compatibility | |
| backend_name = backend.name if isinstance(backend.name, str) else backend.name() | |
| log(f"Selected IonQ backend: {backend_name}") | |
| update_progress(15, f"Backend: {backend_name}") | |
| qc_list=get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution,flag_qubits=flag_qubits,midcircuit_meas=False) | |
| log(f"Generated {len(qc_list)} circuit(s) for timesteps {T_list}") | |
| update_progress(45, f"Generated {len(qc_list)} circuits") | |
| # Transpile circuits for IonQ with optimization_level=1 (recommended by IonQ) | |
| log("Transpiling circuits for IonQ (optimization_level=1)...") | |
| qc_list_transpiled = transpile(qc_list, backend=backend, optimization_level=1) | |
| update_progress(48, "Circuits transpiled") | |
| # === STEP 2: Job Submission (50%) === | |
| log("Submitting job to IonQ...") | |
| update_progress(50, "Submitting job to IonQ...") | |
| job = backend.run(qc_list_transpiled, shots=shots) | |
| job_id = job.job_id() if hasattr(job, 'job_id') else str(job) | |
| log(f"Job submitted! Job ID: {job_id}") | |
| update_progress(52, f"Job submitted: {job_id}") | |
| def get_job_result(j): | |
| """Poll job status and retrieve results with progress updates.""" | |
| log("Monitoring IonQ job status...") | |
| update_progress(55, "Job queued, waiting for execution...") | |
| # IonQ job status polling | |
| last_status = None | |
| poll_count = 0 | |
| max_polls = 60000 # ~10 minutes with 1s interval | |
| while poll_count < max_polls: | |
| try: | |
| status = j.status() | |
| status_name = status.name if hasattr(status, 'name') else str(status) | |
| if status_name != last_status: | |
| last_status = status_name | |
| log(f"Job Status: {status_name}") | |
| if status_name in ('QUEUED', 'VALIDATING'): | |
| update_progress(58, f"Status: {status_name}") | |
| elif status_name == 'RUNNING': | |
| update_progress(70, "Job running on IonQ QPU...") | |
| elif status_name == 'DONE': | |
| update_progress(85, "Job completed!") | |
| break | |
| elif status_name in ('ERROR', 'CANCELLED'): | |
| update_progress(85, f"Job {status_name.lower()}") | |
| break | |
| # Increment progress slightly while waiting | |
| if status_name == 'QUEUED': | |
| queue_progress = 58 + min(7, poll_count * 0.05) | |
| update_progress(queue_progress, f"Queued... (waiting {poll_count}s)") | |
| elif status_name == 'RUNNING': | |
| run_progress = 70 + min(15, poll_count * 0.1) | |
| update_progress(run_progress, f"Running on IonQ... ({poll_count}s)") | |
| # Check if done | |
| if status_name in ('DONE', 'ERROR', 'CANCELLED'): | |
| break | |
| time_module.sleep(1) | |
| poll_count += 1 | |
| except Exception as e: | |
| log(f"Status check error: {e}") | |
| time_module.sleep(2) | |
| poll_count += 2 | |
| log("Retrieving IonQ job results...") | |
| update_progress(87, "Retrieving results...") | |
| # === STEP 3: Creating Plots (90-100%) === | |
| update_progress(90, "Processing results...") | |
| output=[] | |
| total_timesteps = len(T_list) | |
| for i, T_total in enumerate(T_list): | |
| plot_progress = 90 + (i / total_timesteps) * 8 # 90-98% | |
| update_progress(plot_progress, f"Processing timestep T={T_total}...") | |
| counts = j.get_counts(i) | |
| pts, counts = load_samples(counts, T_total, logger=None, flag_qubits=flag_qubits, midcircuit_meas=False) | |
| output+=[estimate_density(pts, counts, bandwidth=0.05, grid_size=output_resolution)] | |
| log(f"Processing complete: {len(output)} timestep(s)") | |
| update_progress(98, "Creating visualization...") | |
| fig = plot_density_isosurface_slider(output, T_list) | |
| update_progress(100, "Complete!") | |
| log("IonQ job completed successfully.") | |
| return output, fig | |
| return job, get_job_result | |
| from qiskit_aer import AerSimulator | |
| def run_sampling_sim( | |
| n, | |
| ux, | |
| uy, | |
| uz, | |
| init_state_prep_circ, | |
| T_list, | |
| vel_resolution=32, | |
| progress_callback=None, | |
| ): | |
| """ | |
| Run QLBM simulation on local Aer statevector simulator. | |
| Parameters | |
| ---------- | |
| n : int | |
| Number of qubits per spatial dimension | |
| ux, uy, uz : callable or str | |
| Velocity field components | |
| init_state_prep_circ : QuantumCircuit | |
| Pre-built initial state preparation circuit from get_named_init_state_circuit() | |
| T_list : list[int] | |
| List of timesteps to simulate | |
| vel_resolution : int | |
| Resolution for velocity field discretization | |
| progress_callback : callable, optional | |
| Function to report progress (0-100) | |
| Returns | |
| ------- | |
| output : list[ndarray] | |
| List of 3D density arrays, one per timestep | |
| fig : go.Figure | |
| Plotly figure with slider animation through all timesteps (includes T=0 snapshot when available) | |
| """ | |
| # if type(ux)==str: | |
| # ux,uy,uz=str_to_lambda(ux,uy,uz) | |
| # # Convert string init_state_prep_circ to circuit if needed (matches original logic) | |
| init_state_label = init_state_prep_circ if isinstance(init_state_prep_circ, str) else "custom" | |
| # if isinstance(init_state_prep_circ, str): | |
| # init_state_prep_circ=get_named_init_state_circuit(n,init_state_prep_circ) | |
| initial_snapshot = None | |
| try: | |
| initial_snapshot = show_initial_distribution( | |
| n=n, | |
| init_state_name=str(init_state_label), | |
| plot=False, | |
| return_data=True, | |
| init_state_circuit=init_state_prep_circ, | |
| ) | |
| except Exception as exc: | |
| print(f"Warning: Unable to compute initial distribution snapshot: {exc}") | |
| qc_list=get_circuit(n,ux,uy,uz,init_state_prep_circ,T_list,vel_resolution,measure=False) | |
| backend = AerSimulator(method = 'statevector') | |
| output=[] | |
| total_steps = len(qc_list) | |
| for i, qc in enumerate(qc_list): | |
| if progress_callback: | |
| percent = int((i / total_steps) * 100) | |
| progress_callback(percent) | |
| qc_transpiled=qc | |
| qc_transpiled.save_statevector(conditional=True) | |
| # Try multiple shots to find a successful (zero-branch) outcome | |
| max_attempts = 100 | |
| success = False | |
| for attempt in range(max_attempts): | |
| job = backend.run(qc_transpiled, memory=True, shots=1) | |
| result = job.result() | |
| data_all = result.data() | |
| statevector_keys = list(dict(data_all['statevector']).keys()) | |
| # Look for the zero branch (0x0) | |
| zero_key = None | |
| for key in statevector_keys: | |
| if '0x' in key: | |
| if int(key[2:], 16) == 0: | |
| zero_key = key | |
| break | |
| elif key == '0' or key == '00' or key.replace('0', '') == '': | |
| zero_key = key | |
| break | |
| if zero_key is not None: | |
| success = True | |
| break | |
| if attempt < max_attempts - 1: | |
| print(f"Attempt {attempt + 1} failed (got branch {statevector_keys[0]}), retrying...") | |
| if not success: | |
| # If all attempts failed, use the first available branch with a warning | |
| print(f"Warning: Could not get zero branch after {max_attempts} attempts. Using first available branch.") | |
| zero_key = statevector_keys[0] | |
| zero_branch_state = data_all['statevector'][zero_key] | |
| sv_mean=np.mean(np.array(zero_branch_state)[:(2**n)**dim]) | |
| sv_phase=sv_mean/np.abs(sv_mean) | |
| final_answer = np.real(np.array(zero_branch_state)[:(2**n)**dim]/sv_phase) | |
| C = np.reshape(np.array(final_answer),tuple(2**n for _ in range(dim))) | |
| output+=[C] | |
| # Create meshgrid for coordinates (used for plotting) | |
| x_coords = np.linspace(0, 1, 2**n) | |
| X = np.meshgrid(x_coords, x_coords, x_coords, indexing='ij') | |
| outputs_for_plot = output.copy() | |
| times_for_plot = list(T_list) | |
| if initial_snapshot is not None: | |
| initial_density, _ = initial_snapshot | |
| if times_for_plot and times_for_plot[0] == 0: | |
| outputs_for_plot[0] = initial_density | |
| else: | |
| outputs_for_plot = [initial_density] + outputs_for_plot | |
| times_for_plot = [0] + times_for_plot | |
| # Create figure with slider for all timesteps (including T=0 if available) | |
| fig = _create_slider_figure(outputs_for_plot, times_for_plot, X) | |
| return output, fig | |
| def _create_slider_figure(output_list, T_list, X): | |
| """ | |
| Create a Plotly figure with slider to animate through timesteps. | |
| Uses visibility toggling instead of frames for better compatibility. | |
| Parameters | |
| ---------- | |
| output_list : list[ndarray] | |
| List of 3D density arrays from simulation | |
| T_list : list[int] | |
| List of timestep values | |
| X : tuple of ndarrays | |
| Meshgrid coordinates | |
| Returns | |
| ------- | |
| fig : go.Figure | |
| Plotly figure with slider animation | |
| """ | |
| # Compute global min/max for consistent color scaling | |
| global_min = min(np.min(C) for C in output_list) | |
| global_max = max(np.max(C) for C in output_list) | |
| fig = go.Figure() | |
| # Add a trace for each timestep | |
| for i, (C, T) in enumerate(zip(output_list, T_list)): | |
| visible = (i == 0) # Only the first trace is visible initially | |
| fig.add_trace(go.Isosurface( | |
| x=X[2].flatten(), | |
| y=X[1].flatten(), | |
| z=X[0].flatten(), | |
| value=C.flatten(), | |
| isomin=global_min, | |
| isomax=global_max, | |
| opacity=0.4, | |
| surface_count=10, | |
| caps=dict(x_show=False, y_show=False, z_show=False), | |
| colorscale=QLBM_PLOT_COLORSCALE, | |
| colorbar=dict(title="Density"), | |
| visible=visible, | |
| name=f"T={T}" | |
| )) | |
| # Create slider steps | |
| steps = [] | |
| for i, T in enumerate(T_list): | |
| # Create visibility array: only the i-th trace is True | |
| step = dict( | |
| method="update", | |
| args=[{"visible": [False] * len(output_list)}, | |
| {"title": f"QLBM Simulation - Timestep T={T}"}], | |
| label=str(T) | |
| ) | |
| step["args"][0]["visible"][i] = True # Toggle i-th trace to True | |
| steps.append(step) | |
| sliders = [dict( | |
| active=0, | |
| currentvalue={"prefix": "Timestep: "}, | |
| pad={"t": 50}, | |
| steps=steps | |
| )] | |
| fig.update_layout( | |
| title=f"QLBM Simulation - Timestep T={T_list[0]}", | |
| scene=dict( | |
| xaxis_title="X", | |
| yaxis_title="Y", | |
| zaxis_title="Z", | |
| aspectmode='cube', | |
| ), | |
| sliders=sliders | |
| ) | |
| return fig | |
| def show_initial_distribution( | |
| n: int, | |
| init_state_name: str = "sin", | |
| # Sinusoidal parameters (frequency multipliers) | |
| sine_k_x: float = 1.0, | |
| sine_k_y: float = 1.0, | |
| sine_k_z: float = 1.0, | |
| # Gaussian parameters | |
| gauss_cx: float = None, | |
| gauss_cy: float = None, | |
| gauss_cz: float = None, | |
| gauss_sigma: float = None, | |
| # Multi-dirac-delta parameters | |
| mdd_kx_log2: int = 1, | |
| mdd_ky_log2: int = 1, | |
| mdd_kz_log2: int = 1, | |
| # Display options | |
| plot: bool = True, | |
| return_data: bool = False, | |
| init_state_circuit=None, | |
| ): | |
| """ | |
| Visualize the initial distribution by running the state preparation circuit | |
| from get_named_init_state_circuit and extracting the resulting statevector. | |
| Parameters | |
| ---------- | |
| n : int | |
| Number of qubits per spatial dimension (grid size = 2^n per axis) | |
| init_state_name : str | |
| One of "dirac_delta", "sin", "gaussian", "multi_dirac_delta" | |
| sine_k_x, sine_k_y, sine_k_z : float | |
| Frequency multipliers for sinusoidal distribution (default=1.0) | |
| gauss_cx, gauss_cy, gauss_cz : float | |
| Center coordinates in [0,1] for Gaussian (default=0.5) | |
| gauss_sigma : float | |
| Spread of Gaussian in normalized units (default=0.2) | |
| mdd_kx_log2, mdd_ky_log2, mdd_kz_log2 : int | |
| log2 of frequency multipliers for multi-dirac-delta distribution (default=1) | |
| plot : bool | |
| Whether to display the 3D isosurface plot (default=True) | |
| return_data : bool | |
| Whether to return the distribution data (default=False) | |
| init_state_circuit : QuantumCircuit, optional | |
| Pre-built circuit to evaluate. When provided, ``init_state_name`` and the | |
| associated parameters are ignored and the supplied circuit is simulated | |
| directly. | |
| Returns | |
| ------- | |
| If return_data is True: | |
| C : ndarray | |
| 3D array of shape (2^n, 2^n, 2^n) containing the initial distribution | |
| X : tuple of ndarrays | |
| Meshgrid coordinates (X[0], X[1], X[2]) for x, y, z axes | |
| If return_data is False: | |
| None | |
| """ | |
| N = 2**n | |
| # Determine the state preparation circuit. Either use the provided circuit | |
| # (useful when custom circuits are passed in) or construct one from the | |
| # named presets for standalone previews. | |
| if init_state_circuit is not None: | |
| init_state_prep_circ = init_state_circuit.copy() | |
| else: | |
| init_state_prep_circ = get_named_init_state_circuit( | |
| n, | |
| init_state_name, | |
| sine_k_x=sine_k_x, | |
| sine_k_y=sine_k_y, | |
| sine_k_z=sine_k_z, | |
| gauss_cx=gauss_cx, | |
| gauss_cy=gauss_cy, | |
| gauss_cz=gauss_cz, | |
| gauss_sigma=gauss_sigma, | |
| mdd_kx_log2=mdd_kx_log2, | |
| mdd_ky_log2=mdd_ky_log2, | |
| mdd_kz_log2=mdd_kz_log2, | |
| ) | |
| # Run the circuit on statevector simulator to extract the initial state | |
| backend = AerSimulator(method='statevector') | |
| # Create a copy of the circuit and save statevector | |
| qc = init_state_prep_circ.copy() | |
| qc.save_statevector() | |
| job = backend.run(qc, shots=1) | |
| result = job.result() | |
| statevector = np.array(result.get_statevector()) | |
| # The statevector represents the initial distribution (amplitudes) | |
| # Take the real part of the amplitudes (they should be real for these distributions) | |
| init_state = np.real(statevector) | |
| # Reshape to 3D grid | |
| C = np.reshape(init_state, (N, N, N)) | |
| # Create meshgrid for coordinates | |
| x_coords = np.linspace(0, 1, N) | |
| X = np.meshgrid(x_coords, x_coords, x_coords, indexing='ij') | |
| if plot: | |
| print(f"Initial distribution: {init_state_name}") | |
| print(f"Grid size: {N} x {N} x {N}") | |
| if init_state_name == "sin": | |
| print(f"Sine frequencies: kx={sine_k_x}, ky={sine_k_y}, kz={sine_k_z}") | |
| elif init_state_name == "gaussian": | |
| cx = float(gauss_cx) if gauss_cx is not None else 0.5 | |
| cy = float(gauss_cy) if gauss_cy is not None else 0.5 | |
| cz = float(gauss_cz) if gauss_cz is not None else 0.5 | |
| sigma = float(gauss_sigma) if gauss_sigma is not None else 0.2 | |
| print(f"Gaussian center: ({cx}, {cy}, {cz}), sigma={sigma}") | |
| elif init_state_name == "multi_dirac_delta": | |
| print(f"Multi-Dirac-Delta: kx_log2={mdd_kx_log2}, ky_log2={mdd_ky_log2}, kz_log2={mdd_kz_log2}") | |
| print(f" Number of peaks: {2**mdd_kx_log2} x {2**mdd_ky_log2} x {2**mdd_kz_log2}") | |
| print("Distribution stats:") | |
| print(f" Min: {np.min(C):.6f}, Max: {np.max(C):.6f}") | |
| print(f" Mean: {np.mean(C):.6f}, Std: {np.std(C):.6f}") | |
| Cmax, Cmin = np.max(C.flatten()), np.min(C.flatten()) | |
| fig = go.Figure(data=go.Isosurface( | |
| x=X[2].flatten(), | |
| y=X[1].flatten(), | |
| z=X[0].flatten(), | |
| value=C.flatten(), | |
| isomin=Cmin, | |
| isomax=Cmax, | |
| opacity=0.4, | |
| surface_count=10, | |
| caps=dict(x_show=False, y_show=False, z_show=False), | |
| colorscale=QLBM_PLOT_COLORSCALE, | |
| )) | |
| fig.update_layout( | |
| title=f"Initial Distribution: {init_state_name}", | |
| scene=dict( | |
| xaxis_title="X", | |
| yaxis_title="Y", | |
| zaxis_title="Z", | |
| ), | |
| ) | |
| fig.show() | |
| if return_data: | |
| return C, X | |
| return None | |
| if __name__=="__main__": | |
| n=3 | |
| # Step 1: Create the initial state circuit ONCE with all parameters | |
| # init_state_prep_circ = get_named_init_state_circuit( | |
| # n=n, | |
| # init_state_name="multi_dirac_delta", # or "gaussian", "dirac_delta" | |
| # ) | |
| # # Alternative: Run on local simulator | |
| # output, fig = run_sampling_sim( | |
| # n=n, | |
| # ux="sin(-2*pi*z)", | |
| # uy="1", | |
| # uz="sin(2*pi*x)", | |
| # init_state_prep_circ="multi_dirac_delta", | |
| # T_list=[1,3,5,7,9], | |
| # vel_resolution=16 | |
| # ) | |
| # print(output) | |
| # fig.show(renderer="browser") | |
| # Step 2: (Optional) Preview the initial distribution | |
| # show_initial_distribution(n=n, init_state_name="sin", sine_k_x=1, sine_k_y=1, sine_k_z=1) | |
| # Step 3: Run simulation - pass the pre-built circuit | |
| job, get_job_result = run_sampling_hw_ionq( | |
| n=n, | |
| ux="1", | |
| uy="1", | |
| uz="1", | |
| init_state_prep_circ="multi_dirac_delta", # Pass the circuit directly | |
| T_list=[1,2], | |
| shots=2**15, | |
| vel_resolution=2, | |
| output_resolution=16 | |
| ) | |
| output,fig = get_job_result(job) | |
| fig.show(renderer="browser") | |