container_trial / app.py
harishaseebat92's picture
Upload 5 files
699c195 verified
import os
import sys
import time
import math
import threading
import tempfile
import gradio as gr
import cudaq
import numpy as np
import cupy as cp
# from scipy import interpolate # Not used
from pathlib import Path
from matplotlib import pyplot as plt
from matplotlib.animation import FuncAnimation
def simulate_qlbm_and_animate(num_reg_qubits: int, T: int, distribution_type: str, ux_input: float, uy_input: float):
"""
Simulates a 2D advection-diffusion problem using a Quantum Lattice Boltzmann Method (QLBM)
and generates a GIF animation of the simulation's evolution.
Args:
num_reg_qubits (int): The number of register qubits (determines grid size N=2^num_reg_qubits).
T (int): The total number of timesteps to run the simulation.
distribution_type (str): The type of initial distribution to use.
ux_input (float): Advection velocity in the x-direction.
uy_input (float): Advection velocity in the y-direction.
Returns:
str: The file path to the generated GIF animation, or None if an error occurs.
"""
video_length = T
simulation_fps = 0.1
frames = int(simulation_fps * video_length)
if frames == 0:
frames = 1
num_anc = 3
num_qubits_total = 2 * num_reg_qubits + num_anc
current_N = 2**num_reg_qubits
N_tot_state_vector = 2**num_qubits_total
num_ranks = 1
rank = 0
N_sub_per_rank = int(N_tot_state_vector // num_ranks)
timesteps_per_frame = 1
if frames < T and frames > 0:
timesteps_per_frame = int(T / frames)
if timesteps_per_frame == 0:
timesteps_per_frame = 1
if distribution_type == "Sine Wave (Original)":
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \
np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1
elif distribution_type == "Gaussian":
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.exp(-((x - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2) +
(y - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2))) * 1.8 + 0.2
elif distribution_type == "Random":
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.random.rand(x.shape[0], x.shape[1]) * 1.5 + 0.2
else:
print(f"Warning: Unknown distribution type '{distribution_type}'. Defaulting to Sine Wave.")
selected_initial_state_function_raw = lambda x, y, N_val_func: \
np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \
np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1
initial_state_func_eval = lambda x_coords, y_coords: \
selected_initial_state_function_raw(x_coords, y_coords, current_N) * \
(y_coords < current_N).astype(int)
with tempfile.TemporaryDirectory() as tmp_npy_dir:
intermediate_folder_path = Path(tmp_npy_dir)
cudaq.set_target('nvidia', option='mgpu,fp64')
@cudaq.kernel
def alloc_kernel(num_qubits_alloc: int):
qubits = cudaq.qvector(num_qubits_alloc)
from cupy.cuda.memory import MemoryPointer, UnownedMemory
def to_cupy_array(state):
tensor = state.getTensor()
pDevice = tensor.data()
sizeByte = tensor.get_num_elements() * tensor.get_element_size()
mem = UnownedMemory(pDevice, sizeByte, owner=state)
memptr_obj = MemoryPointer(mem, 0)
cupy_array = cp.ndarray(tensor.get_num_elements(),
dtype=cp.complex128,
memptr=memptr_obj)
return cupy_array
class QLBMAdvecDiffD2Q5_new:
def __init__(self, ux=0.2, uy=0.15) -> None: # ux, uy are now passed here
self.dim = 2
self.ndir = 5
self.nq_dir = math.ceil(np.log2(self.ndir))
self.dirs = []
for dir_int in range(self.ndir):
dir_bin = f"{dir_int:b}".zfill(self.nq_dir)
self.dirs.append(dir_bin)
self.e_unitvec = np.array([0, 1, -1, 1, -1])
self.wts = np.array([2/6, 1/6, 1/6, 1/6, 1/6])
self.cs = 1 / np.sqrt(3)
self.ux = ux # Use passed ux
self.uy = uy # Use passed uy
self.u = np.array([0, self.ux, self.ux, self.uy, self.uy])
self.wtcoeffs = np.multiply(self.wts, 1 + self.e_unitvec * self.u / self.cs**2)
self.create_circuit()
def create_circuit(self):
v = np.pad(self.wtcoeffs, (0, 2**num_anc - self.ndir))
v = v**0.5
v[0] += 1
v = v / np.linalg.norm(v)
U_prep = 2 * np.outer(v, v) - np.eye(len(v))
cudaq.register_operation("prep_op", U_prep)
def collisionOp(dirs):
dirs_i_list = []
for dir_ in dirs:
dirs_i = [(int(c)) for c in dir_]
dirs_i_list += dirs_i[::-1]
return dirs_i_list
self.dirs_i_list = collisionOp(self.dirs)
@cudaq.kernel
def rshift(q: cudaq.qview, n: int):
for i in range(n):
if i == n - 1:
x(q[n - 1 - i])
elif i == n - 2:
x.ctrl(q[n - 1 - (i + 1)], q[n - 1 - i])
else:
x.ctrl(q[0:n - 1 - i], q[n - 1 - i])
@cudaq.kernel
def lshift(q: cudaq.qview, n: int):
for i in range(n):
if i == 0:
x(q[0])
elif i == 1:
x.ctrl(q[0], q[1])
else:
x.ctrl(q[0:i], q[i])
@cudaq.kernel
def d2q5_tstep(q: cudaq.qview, nqx: int, nqy: int, nq_dir: int, dirs_i: list[int]):
qx = q[0:nqx]
qy = q[nqx:nqx + nqy]
qdir = q[nqx + nqy:nqx + nqy + nq_dir]
i = 2
b_list = dirs_i[i * nq_dir:(i + 1) * nq_dir]
for j in range(nq_dir):
b = b_list[j]
if b == 0:
x(qdir[j])
cudaq.control(lshift, qdir, qx, nqx)
for j in range(nq_dir):
b = b_list[j]
if b == 0:
x(qdir[j])
i = 1
b_list = dirs_i[i * nq_dir:(i + 1) * nq_dir]
for j in range(nq_dir):
b = b_list[j]
if b == 0:
x(qdir[j])
cudaq.control(rshift, qdir, qx, nqx)
for j in range(nq_dir):
b = b_list[j]
if b == 0:
x(qdir[j])
i = 4
b_list = dirs_i[i * nq_dir:(i + 1) * nq_dir]
for j in range(nq_dir):
b = b_list[j]
if b == 0:
x(qdir[j])
cudaq.control(lshift, qdir, qy, nqy)
for j in range(nq_dir):
b = b_list[j]
if b == 0:
x(qdir[j])
i = 3
b_list = dirs_i[i * nq_dir:(i + 1) * nq_dir]
for j in range(nq_dir):
b = b_list[j]
if b == 0:
x(qdir[j])
cudaq.control(rshift, qdir, qy, nqy)
for j in range(nq_dir):
b = b_list[j]
if b == 0:
x(qdir[j])
@cudaq.kernel
def d2q5_tstep_wrapper(state_arg: cudaq.State, nqx: int, nqy: int, nq_dir: int, dirs_i: list[int]):
q = cudaq.qvector(state_arg)
qdir = q[nqx + nqy:nqx + nqy + nq_dir]
prep_op(qdir[2], qdir[1], qdir[0])
d2q5_tstep(q, nqx, nqy, nq_dir, dirs_i)
prep_op(qdir[2], qdir[1], qdir[0])
@cudaq.kernel
def d2q5_tstep_wrapper_hadamard(vec: list[complex], nqx: int, nqy: int, nq_dir: int, dirs_i: list[int]):
q = cudaq.qvector(vec)
qdir = q[nqx + nqy:nqx + nqy + nq_dir]
qy = q[nqx:nqx + nqy]
prep_op(qdir[2], qdir[1], qdir[0])
d2q5_tstep(q, nqx, nqy, nq_dir, dirs_i)
prep_op(qdir[2], qdir[1], qdir[0])
for i in range(nqy):
h(qy[i])
def run_timestep_func(vec, hadamard=False):
if hadamard:
result = cudaq.get_state(d2q5_tstep_wrapper_hadamard, vec, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list)
else:
result = cudaq.get_state(d2q5_tstep_wrapper, vec, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list)
num_nonzero_ranks = num_ranks / (2**num_anc)
rank_slice_cupy = to_cupy_array(result)
if rank >= num_nonzero_ranks:
sub_sv_zeros = np.zeros(N_sub_per_rank, dtype=np.complex128)
cp.cuda.runtime.memcpy(rank_slice_cupy.data.ptr, sub_sv_zeros.ctypes.data, sub_sv_zeros.nbytes, cp.cuda.runtime.memcpyHostToDevice)
if rank == 0 and num_nonzero_ranks < 1:
sub_sv_get = (rank_slice_cupy).get()
sub_sv_get[int(N_tot_state_vector / (2**num_anc)):] = 0
cp.cuda.runtime.memcpy(rank_slice_cupy.data.ptr, sub_sv_get.ctypes.data, sub_sv_get.nbytes, cp.cuda.runtime.memcpyHostToDevice)
return result
self.run_timestep = run_timestep_func
def write_state(self, state_to_write, t_step):
rank_slice_cupy = to_cupy_array(state_to_write)
num_nonzero_ranks = num_ranks / (2**num_anc)
if rank < num_nonzero_ranks:
save_path = intermediate_folder_path / f"{t_step}_{rank}.npy"
with open(save_path, 'wb') as f:
arr_to_save = None
if num_nonzero_ranks < 1:
arr_to_save = cp.real(rank_slice_cupy)[:int(N_tot_state_vector / (2**num_anc))]
else:
arr_to_save = cp.real(rank_slice_cupy)
if len(arr_to_save) > current_N :
arr_to_save=arr_to_save.reshape((current_N, current_N))
arr_to_save=arr_to_save[::downsampling_factor,::downsampling_factor]
arr_to_save=arr_to_save.flatten()
elif len(arr_to_save) > 0 :
arr_to_save=arr_to_save[::downsampling_factor]
np.save(f, arr_to_save.get() if isinstance(arr_to_save, cp.ndarray) else arr_to_save)
def run_evolution(self, initial_state_arg, total_timesteps, observable=False):
current_state = initial_state_arg
for t_iter in range(total_timesteps):
next_state = None
if t_iter == total_timesteps - 1 and observable:
next_state = self.run_timestep(current_state, True)
self.write_state(next_state, str(t_iter + 1) + "_h")
else:
next_state = self.run_timestep(current_state)
if (t_iter + 1) % timesteps_per_frame == 0:
self.write_state(next_state, t_iter + 1)
if rank == 0:
print(f"Timestep: {t_iter + 1}/{total_timesteps}")
cp.get_default_memory_pool().free_all_blocks()
current_state = next_state
last_saved_timestep_name = str(total_timesteps) if not observable else str(total_timesteps) + "_h"
# Check if the very last state (T or T_h) was potentially saved by the loop or observable condition.
# If not, save it.
final_state_file_path = intermediate_folder_path.joinpath(f"{last_saved_timestep_name}_{rank}.npy")
# Also check if the non-observable final T state was saved if T aligns with timesteps_per_frame
final_T_state_file_path = intermediate_folder_path.joinpath(f"{total_timesteps}_{rank}.npy")
needs_final_save = True
if observable and final_state_file_path.exists(): # T_h was saved
needs_final_save = False
elif not observable and final_T_state_file_path.exists() and total_timesteps % timesteps_per_frame == 0 : # T was saved by loop
needs_final_save = False
elif not observable and final_T_state_file_path.exists(): # T was saved by a previous final write
needs_final_save = False
if needs_final_save and total_timesteps > 0:
if not observable: # Save as T_0.npy
self.write_state(current_state, total_timesteps)
# If observable, it should have been saved as T_h. If it wasn't (e.g. T=0), this won't run.
if rank == 0:
print(f"Timestep: {total_timesteps}/{total_timesteps} (Evolution complete)")
cp.get_default_memory_pool().free_all_blocks()
self.final_state = current_state
downsampling_factor = 2**5
# Pass ux_input and uy_input to the constructor
qlbm_obj = QLBMAdvecDiffD2Q5_new(ux=ux_input, uy=uy_input)
initial_state = cudaq.get_state(alloc_kernel, num_qubits_total)
rank_slice_init = to_cupy_array(initial_state)
xv_init = np.arange(current_N)
yv_init = np.arange(current_N)
print(f'Start initializing state with {distribution_type} distribution (ux={ux_input}, uy={uy_input})...')
initial_grid_2d = initial_state_func_eval(*np.meshgrid(xv_init, yv_init))
sub_sv_init_flat = initial_grid_2d.flatten().astype(np.complex128)
full_initial_sv_host = np.zeros(N_sub_per_rank, dtype=np.complex128)
num_computational_states = current_N * current_N
if len(sub_sv_init_flat) == num_computational_states:
full_initial_sv_host[:num_computational_states] = sub_sv_init_flat
else:
print(f"Warning: Mismatch in expected initial state size {num_computational_states} and calculated {len(sub_sv_init_flat)}")
full_initial_sv_host[:len(sub_sv_init_flat)] = sub_sv_init_flat
print(f'Rank {rank}: Initial state (N*N part) prepared. Size: {len(sub_sv_init_flat)}. Total N_sub_per_rank: {N_sub_per_rank}')
cp.cuda.runtime.memcpy(rank_slice_init.data.ptr, full_initial_sv_host.ctypes.data, full_initial_sv_host.nbytes, cp.cuda.runtime.memcpyHostToDevice)
print(f'Rank {rank}: Initial state copied to device.')
print("Starting QLBM evolution...")
qlbm_obj.run_evolution(initial_state, T)
print("QLBM evolution complete.")
print("Generating animation...")
downsampled_N = current_N // downsampling_factor
if downsampled_N == 0:
print("Error: Downsampled grid size is zero. Check num_reg_qubits and downsampling_factor.")
downsampled_N = 1
plotted_timesteps_str = [] # To store frame names (can include '_h')
if timesteps_per_frame > 0:
# Add regular frames saved by the loop
for t_step in range(timesteps_per_frame, T + 1, timesteps_per_frame):
if intermediate_folder_path.joinpath(f"{t_step}_0.npy").exists():
plotted_timesteps_str.append(str(t_step))
# Ensure the very last timestep (T or T_h) is considered for plotting
final_T_path = intermediate_folder_path.joinpath(f"{T}_0.npy")
final_Th_path = intermediate_folder_path.joinpath(f"{T}_h_0.npy")
if T > 0:
if final_Th_path.exists(): # Prefer T_h if it exists
if str(T)+"_h" not in plotted_timesteps_str:
plotted_timesteps_str.append(str(T)+"_h")
elif final_T_path.exists(): # Else, use T if it exists
if str(T) not in plotted_timesteps_str:
plotted_timesteps_str.append(str(T))
# Remove duplicates and sort (handle numeric and '_h' strings appropriately for sorting if needed, though string sort is often fine here)
plotted_timesteps_str = sorted(list(set(plotted_timesteps_str)), key=lambda k: (int(str(k).replace('_h','')), str(k)))
if not plotted_timesteps_str and T > 0 : # Fallback if T is small and not caught by loop
if final_Th_path.exists(): plotted_timesteps_str = [str(T)+"_h"]
elif final_T_path.exists(): plotted_timesteps_str = [str(T)]
elif not plotted_timesteps_str and T == 0:
print("Warning: T=0, no frames to plot for animation.")
return None
data = []
actual_timesteps_for_title = []
for i_str in plotted_timesteps_str:
try:
file_path = intermediate_folder_path / f"{i_str}_0.npy"
sol = np.load(file_path)
if sol.size == downsampled_N * downsampled_N:
Z = np.reshape(sol, (downsampled_N, downsampled_N))
data.append(Z)
actual_timesteps_for_title.append(str(i_str).replace('_h',''))
else:
print(f"Warning: File {file_path} has unexpected size {sol.size}. Expected {downsampled_N*downsampled_N}. Skipping this frame.")
continue
except FileNotFoundError:
print(f"Warning: File not found for timestep {i_str} ({file_path}). Skipping this frame.")
continue
except Exception as e:
print(f"Error loading or reshaping file for timestep {i_str}: {e}. Skipping this frame.")
continue
if not data:
print("Error: No data available to create animation. Check .npy file generation and paths.")
return None
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
x_plot = np.linspace(-10, 10, downsampled_N)
y_plot = np.linspace(-10, 10, downsampled_N)
X_plot, Y_plot = np.meshgrid(x_plot, y_plot)
norm_factor = np.linalg.norm(data[0]) if data[0].size > 0 else 1.0
if norm_factor == 0:
norm_factor = 1.0
def update_frame(frame_idx):
ax.clear()
Z = data[frame_idx]
surf = ax.plot_surface(X_plot, Y_plot, Z / norm_factor, cmap='viridis', linewidth=0, antialiased=False)
current_timestep_for_title = actual_timesteps_for_title[frame_idx] if frame_idx < len(actual_timesteps_for_title) else "Unknown"
ax.set_title(f'Quantum Simulation Evolution (Timestep: {current_timestep_for_title})')
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlim(0, 0.6)
return surf,
gif_animation_fps = 10
ani = FuncAnimation(fig, update_frame, frames=len(data), blit=False)
results_base_dir = "Results"
gif_frames_for_naming = int(simulation_fps * T)
if gif_frames_for_naming == 0: gif_frames_for_naming = 1
# Sanitize distribution_type for filename
dist_name_part = distribution_type.replace(' ','').replace('(Original)','').replace('(','').replace(')','')
specific_folder_name = (f"d2q5_nq{current_N}x{current_N}_T{T}_fr{gif_frames_for_naming}_"
f"dist{dist_name_part}_ux{ux_input:.2f}_uy{uy_input:.2f}")
final_output_dir = Path(results_base_dir) / specific_folder_name
os.makedirs(final_output_dir, exist_ok=True)
gif_path_to_return = final_output_dir / "animation.gif"
try:
ani.save(str(gif_path_to_return), writer='pillow', fps=gif_animation_fps)
print(f"Animation saved to {gif_path_to_return}")
except Exception as e:
print(f"Error saving animation: {e}")
plt.close(fig)
return None
plt.close(fig)
return str(gif_path_to_return)
# --- Gradio Interface Definition ---
def qlbm_gradio_interface(num_reg_qubits_input: int, timescale_input: int, distribution_type_param: str, ux_param: float, uy_param: float):
num_reg_qubits_val = int(num_reg_qubits_input)
timescale_val = int(timescale_input)
ux_val = float(ux_param)
uy_val = float(uy_param)
print(f"Gradio Interface: num_reg_qubits={num_reg_qubits_val}, T={timescale_val}, Distribution={distribution_type_param}, ux={ux_val}, uy={uy_val}")
gif_path_output = simulate_qlbm_and_animate(
num_reg_qubits=num_reg_qubits_val,
T=timescale_val,
distribution_type=distribution_type_param,
ux_input=ux_val,
uy_input=uy_val
)
if gif_path_output is None:
gr.Warning("Animation generation failed. Please check console for errors.")
return None
return gif_path_output
with gr.Blocks(theme=gr.themes.Soft(), title="QLBM Simulation") as qlbm_demo:
gr.Markdown(
"""
# ⚛️ Quantum Lattice Boltzmann Method (QLBM) Simulator
Welcome to the Quantum Lattice Boltzmann Method (QLBM) simulator!
**What is QLBM?**
The Lattice Boltzmann Method (LBM) is a computational fluid dynamics technique for simulating fluid flow. QLBM is its quantum counterpart, leveraging quantum algorithms to potentially offer advantages for certain types of simulations. It's used here to model advection-diffusion phenomena, which describe how substances are transported due to bulk motion (advection) and spread out due to random motion (diffusion).
**How this Simulator Works:**
This simulator implements a D2Q5 model (2 dimensions, 5 velocity directions) on a quantum computer simulator (using CUDA-Q).
- You can control the grid size (via Number of Register Qubits, where grid size is $2^N \\times 2^N$).
- You can set the total simulation time (Timescale T).
- You can choose the initial distribution of the substance on the grid.
- You can set the advection velocities `ux` (x-direction) and `uy` (y-direction).
The simulation will then evolve this initial state over time, and an animation of the substance's density will be generated.
**Note:** Simulations with higher qubit counts and longer timescales can be computationally intensive and may take a significant amount of time to complete.
The $N$ in $2^N \\times 2^N$ grid refers to `num_reg_qubits`. Advection velocities `ux` and `uy` should generally be kept small (e.g., < 0.3) for model stability.
"""
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## Simulation Parameters")
num_reg_qubits_slider = gr.Slider(
minimum=2,
maximum=11,
value=8,
step=1,
label="Number of Register Qubits (num_reg_qubits)",
info="Determines the N for grid size (2^N x 2^N). Max 11 (Note: >8 can be very slow)."
)
timescale_slider = gr.Slider(
minimum=10,
maximum=2000,
value=100,
step=10,
label="Timescale (T)",
info="Total number of timesteps. Max 2000 (Note: higher values increase run time significantly)."
)
distribution_options = ["Sine Wave (Original)", "Gaussian", "Random"]
distribution_type_input = gr.Radio(
choices=distribution_options,
value="Sine Wave (Original)",
label="Initial Distribution Type",
info="Select the initial pattern of the substance on the grid."
)
ux_slider = gr.Slider(
minimum=-0.4,
maximum=0.4,
value=0.2,
step=0.01,
label="Advection Velocity ux",
info="x-component of the background advection velocity."
)
uy_slider = gr.Slider(
minimum=-0.4,
maximum=0.4,
value=0.15,
step=0.01,
label="Advection Velocity uy",
info="y-component of the background advection velocity."
)
run_qlbm_btn = gr.Button("Run QLBM Simulation", variant="primary")
with gr.Column(scale=3):
qlbm_plot_output = gr.Image(label="QLBM Simulation Animation", type="filepath")
qlbm_inputs_list = [num_reg_qubits_slider, timescale_slider, distribution_type_input, ux_slider, uy_slider]
run_qlbm_btn.click(
fn=qlbm_gradio_interface,
inputs=qlbm_inputs_list,
outputs=qlbm_plot_output
)
gr.Examples(
examples=[
[8, 100, "Sine Wave (Original)", 0.2, 0.15],
[6, 50, "Gaussian", 0.1, 0.05],
[4, 30, "Random", -0.05, 0.1],
[7, 70, "Sine Wave (Original)", 0.0, 0.0] # Example with no advection
],
inputs=qlbm_inputs_list,
outputs=qlbm_plot_output,
fn=qlbm_gradio_interface,
cache_examples=False
)
if __name__ == "__main__":
qlbm_demo.launch()