harishaseebat92
Initial upload of source code
7bf5c9d
import os
import math
import tempfile
import gradio as gr
import cudaq
import numpy as np
import cupy as cp
from pathlib import Path
# --- Plotly Imports ---
import plotly.graph_objects as go
import plotly.io as pio
import imageio # For compiling GIF from frames
# Set a default Plotly engine for image export
try:
pio.kaleido.scope.mathjax = None # Can speed up Kaleido if MathJax not needed
except AttributeError:
pass
def simulate_qlbm_and_animate(num_reg_qubits: int, T: int, distribution_type: str, ux_input: float, uy_input: float):
"""
Simulates a 2D advection-diffusion problem using a Quantum Lattice Boltzmann Method (QLBM)
and generates a GIF animation of the simulation's evolution using Plotly.
"""
video_length = T
simulation_fps = 0.1
frames = int(simulation_fps * video_length)
if frames == 0:
frames = 1
num_anc = 3
num_qubits_total = 2 * num_reg_qubits + num_anc
current_N = 2**num_reg_qubits
N_tot_state_vector = 2**num_qubits_total
num_ranks = 1
rank = 0
N_sub_per_rank = int(N_tot_state_vector // num_ranks)
timesteps_per_frame = 1
if frames < T and frames > 0:
timesteps_per_frame = int(T / frames)
if timesteps_per_frame == 0:
timesteps_per_frame = 1
if distribution_type == "Sine Wave (Original)":
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \
np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1
elif distribution_type == "Gaussian":
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.exp(-((x - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2) +
(y - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2))) * 1.8 + 0.2
elif distribution_type == "Random":
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.random.rand(N_val_func, N_val_func) * 1.5 + 0.2 if isinstance(x, int) else \
np.random.rand(x.shape[0], x.shape[1]) * 1.5 + 0.2
else:
print(f"Warning: Unknown distribution type '{distribution_type}'. Defaulting to Sine Wave.")
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \
np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1
initial_state_func_eval = lambda x_coords, y_coords: \
selected_initial_state_function_raw(x_coords, y_coords, current_N) * \
(y_coords < current_N).astype(int)
with tempfile.TemporaryDirectory() as tmp_npy_dir:
intermediate_folder_path = Path(tmp_npy_dir)
cudaq.set_target('nvidia', option='fp64')
@cudaq.kernel
def alloc_kernel(num_qubits_alloc: int):
qubits = cudaq.qvector(num_qubits_alloc)
from cupy.cuda.memory import MemoryPointer, UnownedMemory
def to_cupy_array(state):
tensor = state.getTensor()
pDevice = tensor.data()
sizeByte = tensor.get_num_elements() * tensor.get_element_size()
mem = UnownedMemory(pDevice, sizeByte, owner=state)
memptr_obj = MemoryPointer(mem, 0)
cupy_array_val = cp.ndarray(tensor.get_num_elements(),
dtype=cp.complex128,
memptr=memptr_obj)
return cupy_array_val
class QLBMAdvecDiffD2Q5_new:
def __init__(self, ux=0.2, uy=0.15) -> None:
self.dim = 2
self.ndir = 5
self.nq_dir = math.ceil(np.log2(self.ndir))
self.dirs = []
for dir_int in range(self.ndir):
dir_bin = f"{dir_int:b}".zfill(self.nq_dir)
self.dirs.append(dir_bin)
self.e_unitvec = np.array([0, 1, -1, 1, -1])
self.wts = np.array([2/6, 1/6, 1/6, 1/6, 1/6])
self.cs = 1 / np.sqrt(3)
self.ux = ux
self.uy = uy
self.u = np.array([0, self.ux, self.ux, self.uy, self.uy])
self.wtcoeffs = np.multiply(self.wts, 1 + self.e_unitvec * self.u / self.cs**2)
self.create_circuit()
def create_circuit(self):
v = np.pad(self.wtcoeffs, (0, 2**num_anc - self.ndir))
v = v**0.5
v[0] += 1
v = v / np.linalg.norm(v)
U_prep = 2 * np.outer(v, v) - np.eye(len(v))
cudaq.register_operation("prep_op", U_prep)
def collisionOp(dirs_list):
dirs_i_list_val = []
for dir_str in dirs_list:
dirs_i = [(int(c)) for c in dir_str]
dirs_i_list_val += dirs_i[::-1]
return dirs_i_list_val
self.dirs_i_list = collisionOp(self.dirs)
@cudaq.kernel
def rshift(q: cudaq.qview, n: int):
for i in range(n):
if i == n - 1:
x(q[n - 1 - i])
elif i == n - 2:
x.ctrl(q[n - 1 - (i + 1)], q[n - 1 - i])
else:
x.ctrl(q[0:n - 1 - i], q[n - 1 - i])
@cudaq.kernel
def lshift(q: cudaq.qview, n: int):
for i in range(n):
if i == 0:
x(q[0])
elif i == 1:
x.ctrl(q[0], q[1])
else:
x.ctrl(q[0:i], q[i])
@cudaq.kernel
def d2q5_tstep(q: cudaq.qview, nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]):
qx = q[0:nqx]
qy = q[nqx:nqx + nqy]
qdir = q[nqx + nqy:nqx + nqy + nq_dir_val]
idx_lqx = 2
b_list = dirs_i_val[idx_lqx * nq_dir_val:(idx_lqx + 1) * nq_dir_val]
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
cudaq.control(lshift, qdir, qx, nqx)
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
idx_rqx = 1
b_list = dirs_i_val[idx_rqx * nq_dir_val:(idx_rqx + 1) * nq_dir_val]
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
cudaq.control(rshift, qdir, qx, nqx)
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
idx_lqy = 4
b_list = dirs_i_val[idx_lqy * nq_dir_val:(idx_lqy + 1) * nq_dir_val]
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
cudaq.control(lshift, qdir, qy, nqy)
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
idx_rqy = 3
b_list = dirs_i_val[idx_rqy * nq_dir_val:(idx_rqy + 1) * nq_dir_val]
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
cudaq.control(rshift, qdir, qy, nqy)
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
@cudaq.kernel
def d2q5_tstep_wrapper(state_arg: cudaq.State, nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]):
q = cudaq.qvector(state_arg)
qdir = q[nqx + nqy:nqx + nqy + nq_dir_val]
prep_op(qdir[2], qdir[1], qdir[0]) # Assumes nq_dir_val is always 3
d2q5_tstep(q, nqx, nqy, nq_dir_val, dirs_i_val)
prep_op(qdir[2], qdir[1], qdir[0])
@cudaq.kernel
def d2q5_tstep_wrapper_hadamard(vec_arg: list[complex], nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]):
q = cudaq.qvector(vec_arg)
qdir = q[nqx + nqy:nqx + nqy + nq_dir_val]
qy = q[nqx:nqx + nqy]
prep_op(qdir[2], qdir[1], qdir[0])
d2q5_tstep(q, nqx, nqy, nq_dir_val, dirs_i_val)
prep_op(qdir[2], qdir[1], qdir[0])
for i in range(nqy):
h(qy[i])
# MODIFIED run_timestep_func for memory optimization
def run_timestep_func(vec_arg, hadamard=False):
if hadamard:
result = cudaq.get_state(d2q5_tstep_wrapper_hadamard, vec_arg, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list)
else:
result = cudaq.get_state(d2q5_tstep_wrapper, vec_arg, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list)
num_nonzero_ranks = num_ranks / (2**num_anc)
rank_slice_cupy = to_cupy_array(result) # This is a cupy.ndarray view
if rank >= num_nonzero_ranks and num_nonzero_ranks > 0:
# This part of the original code implies multi-GPU/rank scenarios not fully active with num_ranks=1
# For num_ranks=1, rank is 0, so this block isn't hit if num_nonzero_ranks > 0.
# If num_nonzero_ranks is effectively 0 (e.g. 1/(2^3) < 1, which it is), this condition for rank=0 is also false.
# The original logic: if rank >= num_nonzero_ranks: ...
# If num_ranks=1, num_anc=3 -> num_nonzero_ranks = 1/8. rank=0 is not >= 1/8.
# This block seems intended for ranks that should contain no data.
# For now, keeping original logic path, but the optimization is for the other 'if' block.
sub_sv_zeros = np.zeros(N_sub_per_rank, dtype=np.complex128)
cp.cuda.runtime.memcpy(rank_slice_cupy.data.ptr, sub_sv_zeros.ctypes.data, sub_sv_zeros.nbytes, cp.cuda.runtime.memcpyHostToDevice)
# OPTIMIZED BLOCK: Modify directly on GPU if conditions are met
if rank == 0 and num_nonzero_ranks < 1 and N_sub_per_rank > 0:
# num_nonzero_ranks = 1/8 < 1, so this condition is met for rank 0
limit_idx = int(N_tot_state_vector / (2**num_anc))
if limit_idx < rank_slice_cupy.size:
rank_slice_cupy[limit_idx:] = 0 # Modify directly on GPU using CuPy
# No explicit memcpy back to GPU needed as rank_slice_cupy is the GPU array view.
return result
self.run_timestep = run_timestep_func
# END OF MODIFIED run_timestep_func
def write_state(self, state_to_write, t_step_str_val):
rank_slice_cupy = to_cupy_array(state_to_write)
num_nonzero_ranks = num_ranks / (2**num_anc)
if rank < num_nonzero_ranks or (rank==0 and num_nonzero_ranks <=0):
save_path = intermediate_folder_path / f"{t_step_str_val}_{rank}.npy"
with open(save_path, 'wb') as f:
arr_to_save = None
data_limit = N_sub_per_rank
if num_nonzero_ranks < 1 and rank == 0: # Only part of rank 0 has data
data_limit = int(N_tot_state_vector / (2**num_anc))
if data_limit > 0:
relevant_part_cupy = cp.real(rank_slice_cupy[:data_limit])
else:
relevant_part_cupy = cp.array([], dtype=cp.float64)
if relevant_part_cupy.size >= current_N * current_N: # Enough data for full grid
arr_flat = relevant_part_cupy[:current_N * current_N]
# CORRECTED CONDITION: Removed reference to downsampled_N
if downsampling_factor > 1 and current_N > 0:
arr_reshaped = arr_flat.reshape((current_N, current_N))
arr_downsampled = arr_reshaped[::downsampling_factor, ::downsampling_factor]
arr_to_save = arr_downsampled.flatten()
else: # No downsampling or conditions not met for it
arr_to_save = arr_flat
elif relevant_part_cupy.size > 0 : # Partial data
if downsampling_factor > 1:
arr_to_save = relevant_part_cupy[::downsampling_factor]
else:
arr_to_save = relevant_part_cupy
if arr_to_save is not None and arr_to_save.size > 0:
np.save(f, arr_to_save.get() if isinstance(arr_to_save, cp.ndarray) else arr_to_save)
def run_evolution(self, initial_state_arg, total_timesteps, observable=False):
current_state_val = initial_state_arg
for t_iter in range(total_timesteps):
next_state_val = None
if t_iter == total_timesteps - 1 and observable:
next_state_val = self.run_timestep(current_state_val, True)
self.write_state(next_state_val, str(t_iter + 1) + "_h")
else:
next_state_val = self.run_timestep(current_state_val)
if (t_iter + 1) % timesteps_per_frame == 0:
self.write_state(next_state_val, str(t_iter + 1))
if rank == 0:
print(f"Timestep: {t_iter + 1}/{total_timesteps}")
cp.get_default_memory_pool().free_all_blocks()
current_state_val = next_state_val
last_saved_timestep_name = str(total_timesteps) if not observable else str(total_timesteps) + "_h"
final_state_file_path = intermediate_folder_path / f"{last_saved_timestep_name}_{rank}.npy"
final_T_state_file_path = intermediate_folder_path / f"{total_timesteps}_{rank}.npy"
needs_final_save = True
if observable and final_state_file_path.exists():
needs_final_save = False
elif not observable and final_T_state_file_path.exists() and total_timesteps > 0 and total_timesteps % timesteps_per_frame == 0 :
needs_final_save = False
elif not observable and final_T_state_file_path.exists():
needs_final_save = False
if needs_final_save and total_timesteps > 0 :
if not observable:
self.write_state(current_state_val, str(total_timesteps))
elif total_timesteps == 0 and current_state_val is not None:
self.write_state(current_state_val, "0")
if rank == 0:
print(f"Timestep: {total_timesteps}/{total_timesteps} (Evolution complete)")
cp.get_default_memory_pool().free_all_blocks()
self.final_state = current_state_val
# --- END OF QLBM CLASS ---
downsampling_factor = 2**5
if current_N == 0:
print("Error: current_N is zero. num_reg_qubits likely too small.")
return None
if current_N < downsampling_factor:
downsampling_factor = current_N if current_N > 0 else 1
qlbm_obj = QLBMAdvecDiffD2Q5_new(ux=ux_input, uy=uy_input)
initial_state_val = cudaq.get_state(alloc_kernel, num_qubits_total)
xv_init = np.arange(current_N)
yv_init = np.arange(current_N)
initial_grid_2d_X, initial_grid_2d_Y = np.meshgrid(xv_init, yv_init)
if distribution_type == "Random":
initial_grid_2d = selected_initial_state_function_raw(current_N, current_N, current_N) # Pass N for shape
else:
initial_grid_2d = initial_state_func_eval(initial_grid_2d_X, initial_grid_2d_Y)
sub_sv_init_flat = initial_grid_2d.flatten().astype(np.complex128)
full_initial_sv_host = np.zeros(N_sub_per_rank, dtype=np.complex128)
num_computational_states = current_N * current_N
if len(sub_sv_init_flat) == num_computational_states:
if num_computational_states <= N_sub_per_rank:
full_initial_sv_host[:num_computational_states] = sub_sv_init_flat
else:
print(f"Error: Grid data {num_computational_states} > N_sub_per_rank {N_sub_per_rank}")
return None
else:
print(f"Warning: Initial state size {len(sub_sv_init_flat)} != expected {num_computational_states}")
fill_len = min(len(sub_sv_init_flat), num_computational_states, N_sub_per_rank)
full_initial_sv_host[:fill_len] = sub_sv_init_flat[:fill_len]
rank_slice_init = to_cupy_array(initial_state_val)
print(f'Rank {rank}: Initializing state with {distribution_type} (ux={ux_input}, uy={uy_input})...')
cp.cuda.runtime.memcpy(rank_slice_init.data.ptr, full_initial_sv_host.ctypes.data, full_initial_sv_host.nbytes, cp.cuda.runtime.memcpyHostToDevice)
print(f'Rank {rank}: Initial state copied. Size: {len(sub_sv_init_flat)}. N_sub_per_rank: {N_sub_per_rank}')
print("Starting QLBM evolution...")
qlbm_obj.run_evolution(initial_state_val, T)
print("QLBM evolution complete.")
print("Generating animation with Plotly...")
downsampled_N = current_N // downsampling_factor
if downsampled_N == 0 and current_N > 0:
downsampled_N = 1
elif current_N == 0:
print("Error: current_N is zero before Plotly stage.")
return None
plotted_timesteps_str = []
if T == 0:
if (intermediate_folder_path / f"0_{rank}.npy").exists():
plotted_timesteps_str.append("0")
else:
if timesteps_per_frame > 0:
for t_step_val in range(timesteps_per_frame, T + 1, timesteps_per_frame):
if intermediate_folder_path.joinpath(f"{t_step_val}_{rank}.npy").exists():
plotted_timesteps_str.append(str(t_step_val))
final_T_path = intermediate_folder_path.joinpath(f"{T}_{rank}.npy")
if final_T_path.exists() and str(T) not in plotted_timesteps_str:
plotted_timesteps_str.append(str(T))
plotted_timesteps_str = sorted(list(set(plotted_timesteps_str)), key=lambda k_str: int(k_str))
if not plotted_timesteps_str :
print(f"Warning: No .npy files found for plotting T={T}. Animation may fail.")
return None
data_frames_list = []
actual_timesteps_for_title = []
for i_str_val in plotted_timesteps_str:
try:
file_path_load = intermediate_folder_path / f"{i_str_val}_{rank}.npy"
sol_loaded = np.load(file_path_load)
if sol_loaded.size == downsampled_N * downsampled_N:
Z_data = np.reshape(sol_loaded, (downsampled_N, downsampled_N))
data_frames_list.append(Z_data)
actual_timesteps_for_title.append(i_str_val)
else:
print(f"Warning: File {file_path_load} size {sol_loaded.size} != expected {downsampled_N*downsampled_N}. Skipping.")
continue
except FileNotFoundError:
print(f"Warning: File {file_path_load} not found. Skipping.")
continue
except Exception as e_load:
print(f"Error loading/reshaping {file_path_load}: {e_load}. Skipping.")
continue
if not data_frames_list:
print("Error: No data frames loaded. Cannot create animation.")
return None
x_coords_plot = np.linspace(-10, 10, downsampled_N)
y_coords_plot = np.linspace(-10, 10, downsampled_N)
norm_factor_plotly = np.max(np.abs(data_frames_list[0])) if data_frames_list[0].size > 0 and np.max(np.abs(data_frames_list[0])) != 0 else 1.0
if norm_factor_plotly == 0: norm_factor_plotly = 1.0
all_data_max_val = norm_factor_plotly
if len(data_frames_list) > 0: # Should always be true if we reached here
max_vals_frames = [np.max(np.abs(d_frame)) for d_frame in data_frames_list if d_frame.size > 0]
if max_vals_frames:
global_max_val_frames = max(max_vals_frames)
if global_max_val_frames > 0 : all_data_max_val = global_max_val_frames
clim_upper_plotly = (all_data_max_val / norm_factor_plotly) * 1.0
if clim_upper_plotly == 0 : clim_upper_plotly = 0.6
results_base_dir = "Results"
gif_frames_for_naming = int(simulation_fps * T)
if gif_frames_for_naming == 0: gif_frames_for_naming = 1
dist_name_part = distribution_type.replace(' ','').replace('(Original)','').replace('(','').replace(')','')
specific_folder_name = (f"d2q5_plotly_N{current_N}_T{T}_fr{gif_frames_for_naming}_"
f"dist{dist_name_part}_ux{ux_input:.2f}_uy{uy_input:.2f}")
final_output_dir = Path(results_base_dir) / specific_folder_name
os.makedirs(final_output_dir, exist_ok=True)
gif_path_to_return = final_output_dir / "animation_plotly.gif"
Z_initial_placeholder = np.zeros((downsampled_N, downsampled_N))
fig = go.Figure(data=[go.Surface(
z=Z_initial_placeholder, x=x_coords_plot, y=y_coords_plot,
colorscale='Viridis', cmin=0, cmax=clim_upper_plotly,
colorbar=dict(
title=dict(text='Density', side='right', font=dict(color='white')),
tickfont=dict(size=10, color='white'),
bgcolor='rgba(0,0,0,0.4)', bordercolor='gray', outlinewidth=1,
thickness=20, len=0.8, x=0.85, y=0.5
))])
fig.update_layout(
paper_bgcolor='black', plot_bgcolor='black', font=dict(color='white'),
scene=dict(
bgcolor='black',
xaxis=dict(title=dict(text='X Axis', font=dict(color='white')),
tickfont=dict(color='white'),
gridcolor='rgba(128,128,128,0.3)', zerolinecolor='rgba(128,128,128,0.5)',
linecolor='rgba(128,128,128,0.5)'),
yaxis=dict(title=dict(text='Y Axis', font=dict(color='white')),
tickfont=dict(color='white'),
gridcolor='rgba(128,128,128,0.3)', zerolinecolor='rgba(128,128,128,0.5)',
linecolor='rgba(128,128,128,0.5)'),
zaxis=dict(title=dict(text='Density', font=dict(color='white')),
tickfont=dict(color='white'),
range=[0, clim_upper_plotly], gridcolor='rgba(128,128,128,0.3)',
zerolinecolor='rgba(128,128,128,0.5)', linecolor='rgba(128,128,128,0.5)'),
aspectratio=dict(x=1, y=1, z=0.7),
camera=dict(eye=dict(x=1.5, y=1.5, z=1.0), center=dict(x=0, y=0, z=-0.1), up=dict(x=0,y=0,z=1))
),
margin=dict(l=10, r=10, b=10, t=60), width=800, height=700
)
temp_image_files = []
gif_fps_val = 10
try:
for k, Z_frame_data in enumerate(data_frames_list):
Z_plotly_frame = Z_frame_data / norm_factor_plotly
fig.data[0].z = Z_plotly_frame
current_ts_title = actual_timesteps_for_title[k] if k < len(actual_timesteps_for_title) else "Unknown"
fig.update_layout(
title_text=f'QLBM Simulation (Plotly) - Timestep: {current_ts_title}',
title_x=0.5, title_font_size=16
)
temp_image_path = intermediate_folder_path / f"plotly_frame_{k:04d}.png"
fig.write_image(str(temp_image_path), engine="kaleido")
temp_image_files.append(str(temp_image_path))
if not temp_image_files:
print("Error: No Plotly frames were successfully generated.")
# Cleanup potentially created empty directory
if not any(final_output_dir.iterdir()): # Check if directory is empty
try:
os.rmdir(final_output_dir)
except OSError:
pass # Ignore if it fails (e.g. not empty for other reasons)
return None
with imageio.get_writer(str(gif_path_to_return), mode='I', fps=gif_fps_val, loop=0) as writer:
for filename in temp_image_files:
image = imageio.imread(filename)
writer.append_data(image)
print(f"Plotly animation saved to {gif_path_to_return}")
except Exception as e_plotly_anim:
print(f"Error during Plotly animation/saving: {e_plotly_anim}")
print("Ensure 'kaleido' and 'imageio' are installed ('pip install kaleido imageio').")
return None
return str(gif_path_to_return)
# --- Gradio Interface Definition ---
def qlbm_gradio_interface(num_reg_qubits_input: int, timescale_input: int, distribution_type_param: str, ux_param: float, uy_param: float):
num_reg_qubits_val = int(num_reg_qubits_input)
timescale_val = int(timescale_input)
ux_val = float(ux_param)
uy_val = float(uy_param)
print(f"Gradio Interface: num_reg_qubits={num_reg_qubits_val}, T={timescale_val}, Distribution={distribution_type_param}, ux={ux_val}, uy={uy_val}")
gif_path_output = simulate_qlbm_and_animate(
num_reg_qubits=num_reg_qubits_val,
T=timescale_val,
distribution_type=distribution_type_param,
ux_input=ux_val,
uy_input=uy_val
)
if gif_path_output is None:
gr.Warning("Animation generation failed. Please check console for errors.")
return None
return gif_path_output
with gr.Blocks(theme=gr.themes.Soft(), title="QLBM Simulation with Plotly") as qlbm_demo:
gr.Markdown(
"""
# ⚛️ Quantum Lattice Boltzmann Method (QLBM) Simulator (Plotly Animation)
Welcome to the Quantum Lattice Boltzmann Method (QLBM) simulator! This version uses Plotly for 3D animation.
**How this Simulator Works:**
This simulator implements a D2Q5 model on a quantum computer simulator (CUDA-Q).
- Control grid size (via Number of Register Qubits: $N=2^{\text{num_reg_qubits}}$).
- Set total simulation time (Timescale T).
- Choose initial distribution.
- Set advection velocities `ux` and `uy`.
The simulation evolves this state, and a Plotly animation of the density is generated as a GIF.
**Note:** Higher qubit counts and longer timescales are computationally intensive. Advection velocities should be small (e.g., < 0.3).
The output GIF will loop indefinitely. Values for "Number of Register Qubits" above 9 may be unstable or very slow on shared/limited hardware.
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## Simulation Parameters")
num_reg_qubits_slider = gr.Slider(
minimum=2, maximum=10, value=8, step=1, # Max set to 10
label="Number of Register Qubits (num_reg_qubits)",
info="Grid N = 2^num_reg_qubits. Max 10 (Note: >8 slow; >9 may hit simulator/memory limits on free tiers)."
)
timescale_slider = gr.Slider(
minimum=0, maximum=2000, value=100, step=10,
label="Timescale (T)", info="Total number of timesteps. Max 2000."
)
distribution_options = ["Sine Wave (Original)", "Gaussian", "Random"]
distribution_type_input = gr.Radio(
choices=distribution_options, value="Sine Wave (Original)",
label="Initial Distribution Type", info="Select the initial pattern of the substance."
)
ux_slider = gr.Slider(
minimum=-0.4, maximum=0.4, value=0.2, step=0.01,
label="Advection Velocity ux", info="x-component of background advection."
)
uy_slider = gr.Slider(
minimum=-0.4, maximum=0.4, value=0.15, step=0.01,
label="Advection Velocity uy", info="y-component of background advection."
)
run_qlbm_btn = gr.Button("Run QLBM Simulation (Plotly)", variant="primary")
with gr.Column(scale=2):
qlbm_plot_output = gr.Image(label="QLBM Simulation Animation (Plotly GIF)", type="filepath", height=600)
qlbm_inputs_list = [num_reg_qubits_slider, timescale_slider, distribution_type_input, ux_slider, uy_slider]
run_qlbm_btn.click(
fn=qlbm_gradio_interface,
inputs=qlbm_inputs_list,
outputs=qlbm_plot_output
)
gr.Examples(
examples=[
[8, 100, "Sine Wave (Original)", 0.2, 0.15],
[6, 50, "Gaussian", 0.1, 0.05],
[4, 30, "Random", -0.05, 0.1],
],
inputs=qlbm_inputs_list,
outputs=qlbm_plot_output,
fn=qlbm_gradio_interface,
cache_examples=False
)
if __name__ == "__main__":
try:
cudaq.set_target('nvidia', option='fp64')
print(f"CUDA-Q Target successfully set to: {cudaq.get_target().name}")
except Exception as e_target:
print(f"Warning: Could not set CUDA-Q target to 'nvidia'. Error: {e_target}")
print(f"Current CUDA-Q Target: {cudaq.get_target().name}. Performance may be affected.")
qlbm_demo.launch()