harishaseebat92's picture
Update app.py
db4565c verified
import os
import math
import tempfile
import gradio as gr
import cudaq
import numpy as np
import cupy as cp
from pathlib import Path
import plotly.graph_objects as go
import plotly.io as pio
import imageio
from scipy.spatial import Delaunay
# Set Plotly engine for image export
try:
pio.kaleido.scope.mathjax = None
except AttributeError:
pass
def simulate_qlbm_and_animate(num_reg_qubits: int, T: int, distribution_type: str, ux_input: float, uy_input: float, velocity_field_type: str):
"""
Simulates a 2D advection-diffusion problem using a Quantum Lattice Boltzmann Method (QLBM)
and generates an interactive Plotly figure with a slider for selected time steps.
"""
num_anc = 3
num_qubits_total = 2 * num_reg_qubits + num_anc
current_N = 2**num_reg_qubits
N_tot_state_vector = 2**num_qubits_total
num_ranks = 1
rank = 0
N_sub_per_rank = int(N_tot_state_vector // num_ranks)
# Initial state setup
if distribution_type == "Sine Wave (Original)":
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \
np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1
elif distribution_type == "Gaussian":
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.exp(-((x - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2) +
(y - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2))) * 1.8 + 0.2
elif distribution_type == "Random":
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.random.rand(N_val_func, N_val_func) * 1.5 + 0.2 if isinstance(x, int) else \
np.random.rand(x.shape[0], x.shape[1]) * 1.5 + 0.2
else:
print(f"Warning: Unknown distribution type '{distribution_type}'. Defaulting to Sine Wave.")
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \
np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1
initial_state_func_eval = lambda x_coords, y_coords: \
selected_initial_state_function_raw(x_coords, y_coords, current_N) * \
(y_coords < current_N).astype(int)
with tempfile.TemporaryDirectory() as tmp_npy_dir:
intermediate_folder_path = Path(tmp_npy_dir)
cudaq.set_target('nvidia', option='fp64')
@cudaq.kernel
def alloc_kernel(num_qubits_alloc: int):
qubits = cudaq.qvector(num_qubits_alloc)
from cupy.cuda.memory import MemoryPointer, UnownedMemory
def to_cupy_array(state):
tensor = state.getTensor()
pDevice = tensor.data()
sizeByte = tensor.get_num_elements() * tensor.get_element_size()
mem = UnownedMemory(pDevice, sizeByte, owner=state)
memptr_obj = MemoryPointer(mem, 0)
cupy_array_val = cp.ndarray(tensor.get_num_elements(),
dtype=cp.complex128,
memptr=memptr_obj)
return cupy_array_val
class QLBMAdvecDiffD2Q5_new:
def __init__(self, ux=0.2, uy=0.15) -> None:
self.dim = 2
self.ndir = 5
self.nq_dir = math.ceil(np.log2(self.ndir))
self.dirs = []
for dir_int in range(self.ndir):
dir_bin = f"{dir_int:b}".zfill(self.nq_dir)
self.dirs.append(dir_bin)
self.e_unitvec = np.array([0, 1, -1, 1, -1])
self.wts = np.array([2/6, 1/6, 1/6, 1/6, 1/6])
self.cs = 1 / np.sqrt(3)
self.ux = ux
self.uy = uy
self.u = np.array([0, self.ux, self.ux, self.uy, self.uy])
self.wtcoeffs = np.multiply(self.wts, 1 + self.e_unitvec * self.u / self.cs**2)
self.create_circuit()
def create_circuit(self):
v = np.pad(self.wtcoeffs, (0, 2**num_anc - self.ndir))
v = v**0.5
v[0] += 1 # This was missing in your original code
v = v / np.linalg.norm(v)
U_prep = 2 * np.outer(v, v) - np.eye(len(v))
cudaq.register_operation("prep_op", U_prep)
def collisionOp(dirs_list):
dirs_i_list_val = []
for dir_str in dirs_list:
dirs_i = [int(c) for c in dir_str]
dirs_i_list_val += dirs_i[::-1]
return dirs_i_list_val
self.dirs_i_list = collisionOp(self.dirs)
@cudaq.kernel
def rshift(q: cudaq.qview, n: int):
for i in range(n):
if i == n - 1:
x(q[n - 1 - i])
elif i == n - 2:
x.ctrl(q[n - 1 - (i + 1)], q[n - 1 - i])
else:
x.ctrl(q[0:n - 1 - i], q[n - 1 - i])
@cudaq.kernel
def lshift(q: cudaq.qview, n: int):
for i in range(n):
if i == 0:
x(q[0])
elif i == 1:
x.ctrl(q[0], q[1])
else:
x.ctrl(q[0:i], q[i])
@cudaq.kernel
def d2q5_tstep(q: cudaq.qview, nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]):
qx = q[0:nqx]
qy = q[nqx:nqx + nqy]
qdir = q[nqx + nqy:nqx + nqy + nq_dir_val]
idx_lqx = 2
b_list = dirs_i_val[idx_lqx * nq_dir_val:(idx_lqx + 1) * nq_dir_val]
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
cudaq.control(lshift, qdir, qx, nqx)
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
idx_rqx = 1
b_list = dirs_i_val[idx_rqx * nq_dir_val:(idx_rqx + 1) * nq_dir_val]
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
cudaq.control(rshift, qdir, qx, nqx)
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
idx_lqy = 4
b_list = dirs_i_val[idx_lqy * nq_dir_val:(idx_lqy + 1) * nq_dir_val]
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
cudaq.control(lshift, qdir, qy, nqy)
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
idx_rqy = 3
b_list = dirs_i_val[idx_rqy * nq_dir_val:(idx_rqy + 1) * nq_dir_val]
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
cudaq.control(rshift, qdir, qy, nqy)
for j in range(nq_dir_val):
if b_list[j] == 0: x(qdir[j])
@cudaq.kernel
def d2q5_tstep_wrapper(state_arg: cudaq.State, nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]):
q = cudaq.qvector(state_arg)
qdir = q[nqx + nqy:nqx + nqy + nq_dir_val]
prep_op(qdir[2], qdir[1], qdir[0]) # Uncommented
d2q5_tstep(q, nqx, nqy, nq_dir_val, dirs_i_val)
prep_op(qdir[2], qdir[1], qdir[0]) # Uncommented
@cudaq.kernel
def d2q5_tstep_wrapper_hadamard(vec_arg: list[complex], nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]):
q = cudaq.qvector(vec_arg)
qdir = q[nqx + nqy:nqx + nqy + nq_dir_val]
qy = q[nqx:nqx + nqy]
prep_op(qdir[2], qdir[1], qdir[0]) # Uncommented
d2q5_tstep(q, nqx, nqy, nq_dir_val, dirs_i_val)
prep_op(qdir[2], qdir[1], qdir[0]) # Uncommented
for i in range(nqy):
h(qy[i])
def run_timestep_func(vec_arg, hadamard=False):
if hadamard:
result = cudaq.get_state(d2q5_tstep_wrapper_hadamard, vec_arg, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list)
else:
result = cudaq.get_state(d2q5_tstep_wrapper, vec_arg, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list)
num_nonzero_ranks = num_ranks / (2**num_anc)
rank_slice_cupy = to_cupy_array(result)
if rank >= num_nonzero_ranks and num_nonzero_ranks > 0:
sub_sv_zeros = np.zeros(N_sub_per_rank, dtype=np.complex128)
cp.cuda.runtime.memcpy(rank_slice_cupy.data.ptr, sub_sv_zeros.ctypes.data, sub_sv_zeros.nbytes, cp.cuda.runtime.memcpyHostToDevice)
if rank == 0 and num_nonzero_ranks < 1 and N_sub_per_rank > 0:
limit_idx = int(N_tot_state_vector / (2**num_anc))
if limit_idx < rank_slice_cupy.size:
rank_slice_cupy[limit_idx:] = 0
return result
self.run_timestep = run_timestep_func
def write_state(self, state_to_write, t_step_str_val):
rank_slice_cupy = to_cupy_array(state_to_write)
num_nonzero_ranks = num_ranks / (2**num_anc)
if rank < num_nonzero_ranks or (rank == 0 and num_nonzero_ranks <= 0):
save_path = intermediate_folder_path / f"{t_step_str_val}_{rank}.npy"
with open(save_path, 'wb') as f:
arr_to_save = None
data_limit = N_sub_per_rank
if num_nonzero_ranks < 1 and rank == 0:
data_limit = int(N_tot_state_vector / (2**num_anc))
if data_limit > 0:
relevant_part_cupy = cp.real(rank_slice_cupy[:data_limit])
else:
relevant_part_cupy = cp.array([], dtype=cp.float64)
if relevant_part_cupy.size >= current_N * current_N:
arr_flat = relevant_part_cupy[:current_N * current_N]
if downsampling_factor > 1 and current_N > 0:
arr_reshaped = arr_flat.reshape((current_N, current_N))
arr_downsampled = arr_reshaped[::downsampling_factor, ::downsampling_factor]
arr_to_save = arr_downsampled.flatten()
else:
arr_to_save = arr_flat
elif relevant_part_cupy.size > 0:
if downsampling_factor > 1:
arr_to_save = relevant_part_cupy[::downsampling_factor]
else:
arr_to_save = relevant_part_cupy
if arr_to_save is not None and arr_to_save.size > 0:
np.save(f, arr_to_save.get() if isinstance(arr_to_save, cp.ndarray) else arr_to_save)
def run_evolution(self, initial_state_arg, total_timesteps, observable=False):
current_state_val = initial_state_arg
self.write_state(current_state_val, '0') # Save initial state
# Save data at regular intervals for better slider functionality
save_interval = max(1, total_timesteps // 10) # Save every 10% of simulation
for t_iter in range(total_timesteps):
next_state_val = None
if t_iter == total_timesteps - 1 and observable:
next_state_val = self.run_timestep(current_state_val, True)
self.write_state(next_state_val, str(t_iter + 1) + "_h")
else:
next_state_val = self.run_timestep(current_state_val)
# Save at regular intervals AND at specific timesteps
if (t_iter + 1) % save_interval == 0 or t_iter + 1 in [total_timesteps//4, 3*total_timesteps//4, total_timesteps]:
self.write_state(next_state_val, str(t_iter + 1))
if rank == 0 and (t_iter + 1) % 10 == 0:
print(f"Timestep: {t_iter + 1}/{total_timesteps}")
cp.get_default_memory_pool().free_all_blocks()
current_state_val = next_state_val
if rank == 0:
print(f"Timestep: {total_timesteps}/{total_timesteps} (Evolution complete)")
cp.get_default_memory_pool().free_all_blocks()
self.final_state = current_state_val
downsampling_factor = 2**5
if current_N == 0:
print("Error: current_N is zero. num_reg_qubits likely too small.")
return None
if current_N < downsampling_factor:
downsampling_factor = current_N if current_N > 0 else 1
qlbm_obj = QLBMAdvecDiffD2Q5_new(ux=ux_input, uy=uy_input)
initial_state_val = cudaq.get_state(alloc_kernel, num_qubits_total)
xv_init = np.arange(current_N)
yv_init = np.arange(current_N)
initial_grid_2d_X, initial_grid_2d_Y = np.meshgrid(xv_init, yv_init)
if distribution_type == "Random":
initial_grid_2d = selected_initial_state_function_raw(current_N, current_N, current_N)
else:
initial_grid_2d = initial_state_func_eval(initial_grid_2d_X, initial_grid_2d_Y)
sub_sv_init_flat = initial_grid_2d.flatten().astype(np.complex128)
full_initial_sv_host = np.zeros(N_sub_per_rank, dtype=np.complex128)
num_computational_states = current_N * current_N
if len(sub_sv_init_flat) == num_computational_states:
if num_computational_states <= N_sub_per_rank:
full_initial_sv_host[:num_computational_states] = sub_sv_init_flat
else:
print(f"Error: Grid data {num_computational_states} > N_sub_per_rank {N_sub_per_rank}")
return None
else:
print(f"Warning: Initial state size {len(sub_sv_init_flat)} != expected {num_computational_states}")
fill_len = min(len(sub_sv_init_flat), num_computational_states, N_sub_per_rank)
full_initial_sv_host[:fill_len] = sub_sv_init_flat[:fill_len]
rank_slice_init = to_cupy_array(initial_state_val)
print(f'Rank {rank}: Initializing state with {distribution_type} (ux={ux_input}, uy={uy_input})...')
cp.cuda.runtime.memcpy(rank_slice_init.data.ptr, full_initial_sv_host.ctypes.data, full_initial_sv_host.nbytes, cp.cuda.runtime.memcpyHostToDevice)
print(f'Rank {rank}: Initial state copied. Size: {len(sub_sv_init_flat)}. N_sub_per_rank: {N_sub_per_rank}')
print("Starting QLBM evolution...")
qlbm_obj.run_evolution(initial_state_val, T)
print("QLBM evolution complete.")
print("Generating plots with Plotly...")
downsampled_N = current_N // downsampling_factor
if downsampled_N == 0 and current_N > 0:
downsampled_N = 1
elif current_N == 0:
print("Error: current_N is zero before Plotly stage.")
return None
# Load more timesteps for better slider experience
time_steps_to_load = list(range(0, T+1, max(1, T//10))) + [T] # 10 steps plus final
time_steps_to_load = sorted(list(set(time_steps_to_load))) # Remove duplicates and sort
data_frames = []
actual_timesteps_loaded = []
for t in time_steps_to_load:
file_path = intermediate_folder_path / f"{t}_{rank}.npy"
if file_path.exists():
sol_loaded = np.load(file_path)
if sol_loaded.size == downsampled_N * downsampled_N:
Z_data = np.reshape(sol_loaded, (downsampled_N, downsampled_N))
data_frames.append(Z_data)
actual_timesteps_loaded.append(t)
else:
print(f"Warning: File {file_path} size {sol_loaded.size} != expected {downsampled_N*downsampled_N}. Skipping.")
else:
print(f"Warning: File {file_path} not found. Skipping.")
if not data_frames:
print("Error: No data frames loaded for interactive plot.")
return None
x_coords_plot = np.linspace(-10, 10, downsampled_N)
y_coords_plot = np.linspace(-10, 10, downsampled_N)
# Calculate global min/max for consistent scaling
z_min = min([np.min(Z) for Z in data_frames])
z_max = max([np.max(Z) for Z in data_frames])
if z_max == z_min:
z_max += 1e-9
# Create interactive Plotly figure with slider
fig = go.Figure()
# Add all traces (one for each timestep)
for i, Z in enumerate(data_frames):
fig.add_trace(
go.Surface(
z=Z, x=x_coords_plot, y=y_coords_plot,
colorscale='Viridis',
cmin=z_min, cmax=z_max,
name=f'Time: {actual_timesteps_loaded[i]}',
visible=(i == 0), # Only first trace visible initially
showscale=(i == 0) # Only show colorbar for first trace
)
)
# Create slider steps correctly
steps = []
for i in range(len(data_frames)):
step = dict(
method="update",
args=[{"visible": [False] * len(data_frames)}],
label=f"Time: {actual_timesteps_loaded[i]}"
)
step["args"][0]["visible"][i] = True # Make only the i-th trace visible
steps.append(step)
# Configure sliders properly
sliders = [dict(
active=0,
currentvalue={"prefix": "Time: "},
pad={"t": 50},
steps=steps
)]
fig.update_layout(
title='QLBM Simulation - Density Evolution',
scene=dict(
xaxis_title='X',
yaxis_title='Y',
zaxis_title='Density',
xaxis=dict(range=[x_coords_plot[0], x_coords_plot[-1]]),
yaxis=dict(range=[y_coords_plot[0], y_coords_plot[-1]]),
zaxis=dict(range=[z_min, z_max]),
),
sliders=sliders,
width=1000,
height=900
)
return fig
# Gradio Interface Definition
def qlbm_gradio_interface(num_reg_qubits_input: int, timescale_input: int, distribution_type_param: str, ux_param: float, uy_param: float, velocity_field_type_param: str):
num_reg_qubits_val = int(num_reg_qubits_input)
timescale_val = int(timescale_input)
ux_val = float(ux_param)
uy_val = float(uy_param)
print(f"Gradio Interface: num_reg_qubits={num_reg_qubits_val}, T={timescale_val}, Distribution={distribution_type_param}, ux={ux_val}, uy={uy_val}, VelocityFieldType={velocity_field_type_param}")
plot_fig = simulate_qlbm_and_animate(
num_reg_qubits=num_reg_qubits_val,
T=timescale_val,
distribution_type=distribution_type_param,
ux_input=ux_val,
uy_input=uy_val,
velocity_field_type=velocity_field_type_param
)
if plot_fig is None:
gr.Warning("Simulation or plotting failed. Please check console for errors.")
return None
return plot_fig
with gr.Blocks(theme=gr.themes.Soft(), title="QLBM Simulation with Plotly") as qlbm_demo:
gr.Markdown(
"""
# ⚛️ Quantum Lattice Boltzmann Method (QLBM) Simulator (Plotly Animation)
Welcome to the Quantum Lattice Boltzmann Method (QLBM) simulator! This version uses Plotly for 3D animation and interactive plots.
**How this Simulator Works:**
This simulator implements a D2Q5 model on a quantum computer simulator (CUDA-Q).
- Control grid size (via Number of Register Qubits: $N=2^{\text{num_reg_qubits}}$).
- Set total simulation time (Timescale T).
- Choose initial distribution.
- Set advection velocities `ux` and `uy`.
The simulation generates an interactive Plotly figure with a slider for selected time steps.
**Note:** Higher qubit counts and longer timescales are computationally intensive. Advection velocities should be small (e.g., < 0.3).
The Plotly figure allows interactive exploration of specific time steps.
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## Simulation Parameters")
num_reg_qubits_slider = gr.Slider(
minimum=2, maximum=10, value=8, step=1,
label="Number of Register Qubits (num_reg_qubits)",
info="Grid N = 2^num_reg_qubits. Max 10 (Note: >8 slow; >9 may hit simulator/memory limits on free tiers)."
)
timescale_slider = gr.Slider(
minimum=0, maximum=2000, value=100, step=10,
label="Timescale (T)", info="Total number of timesteps. Max 2000."
)
with gr.Accordion("Initial Conditions", open=True):
distribution_options = ["Sine Wave (Original)", "Gaussian", "Random"]
distribution_type_input = gr.Radio(
choices=distribution_options, value="Sine Wave (Original)",
label="Initial Distribution Type", info="Select the initial pattern of the substance."
)
with gr.Accordion("Velocity Fields", open=True):
velocity_field_options = ["Uniform", "Vortex", "Shear"]
velocity_field_type_input = gr.Radio(
choices=velocity_field_options, value="Uniform",
label="Velocity Field Type", info="Select the type of background velocity field."
)
ux_slider = gr.Slider(
minimum=-0.4, maximum=0.4, value=0.2, step=0.01,
label="Advection Velocity ux", info="x-component of background advection."
)
uy_slider = gr.Slider(
minimum=-0.4, maximum=0.4, value=0.15, step=0.01,
label="Advection Velocity uy", info="y-component of background advection."
)
run_qlbm_btn = gr.Button("Run QLBM Simulation", variant="primary")
with gr.Column(scale=2):
qlbm_interactive_plot = gr.Plot(label="Interactive Density Plot with Slider")
qlbm_inputs_list = [num_reg_qubits_slider, timescale_slider, distribution_type_input, ux_slider, uy_slider, velocity_field_type_input]
run_qlbm_btn.click(
fn=qlbm_gradio_interface,
inputs=qlbm_inputs_list,
outputs=[qlbm_interactive_plot]
)
gr.Examples(
examples=[[6, 50, "Gaussian", 0.1, 0.05, "Uniform"]],
inputs=qlbm_inputs_list,
outputs=[qlbm_interactive_plot],
fn=qlbm_gradio_interface,
cache_examples=False
)
if __name__ == "__main__":
try:
cudaq.set_target('nvidia', option='fp64')
print(f"CUDA-Q Target successfully set to: {cudaq.get_target().name}")
except Exception as e_target:
print(f"Warning: Could not set CUDA-Q target to 'nvidia'. Error: {e_target}")
print(f"Current CUDA-Q Target: {cudaq.get_target().name}. Performance may be affected.")
qlbm_demo.launch()