import os import math import tempfile import gradio as gr import cudaq import numpy as np import cupy as cp from pathlib import Path import plotly.graph_objects as go import plotly.io as pio import imageio from scipy.spatial import Delaunay # Set Plotly engine for image export try: pio.kaleido.scope.mathjax = None except AttributeError: pass def simulate_qlbm_and_animate(num_reg_qubits: int, T: int, distribution_type: str, ux_input: float, uy_input: float, velocity_field_type: str): """ Simulates a 2D advection-diffusion problem using a Quantum Lattice Boltzmann Method (QLBM) and generates an interactive Plotly figure with a slider for selected time steps. """ num_anc = 3 num_qubits_total = 2 * num_reg_qubits + num_anc current_N = 2**num_reg_qubits N_tot_state_vector = 2**num_qubits_total num_ranks = 1 rank = 0 N_sub_per_rank = int(N_tot_state_vector // num_ranks) # Initial state setup if distribution_type == "Sine Wave (Original)": selected_initial_state_function_raw = lambda x, y, N_val_func: \ np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \ np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1 elif distribution_type == "Gaussian": selected_initial_state_function_raw = lambda x, y, N_val_func: \ np.exp(-((x - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2) + (y - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2))) * 1.8 + 0.2 elif distribution_type == "Random": selected_initial_state_function_raw = lambda x, y, N_val_func: \ np.random.rand(N_val_func, N_val_func) * 1.5 + 0.2 if isinstance(x, int) else \ np.random.rand(x.shape[0], x.shape[1]) * 1.5 + 0.2 else: print(f"Warning: Unknown distribution type '{distribution_type}'. Defaulting to Sine Wave.") selected_initial_state_function_raw = lambda x, y, N_val_func: \ np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \ np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1 initial_state_func_eval = lambda x_coords, y_coords: \ selected_initial_state_function_raw(x_coords, y_coords, current_N) * \ (y_coords < current_N).astype(int) with tempfile.TemporaryDirectory() as tmp_npy_dir: intermediate_folder_path = Path(tmp_npy_dir) cudaq.set_target('nvidia', option='fp64') @cudaq.kernel def alloc_kernel(num_qubits_alloc: int): qubits = cudaq.qvector(num_qubits_alloc) from cupy.cuda.memory import MemoryPointer, UnownedMemory def to_cupy_array(state): tensor = state.getTensor() pDevice = tensor.data() sizeByte = tensor.get_num_elements() * tensor.get_element_size() mem = UnownedMemory(pDevice, sizeByte, owner=state) memptr_obj = MemoryPointer(mem, 0) cupy_array_val = cp.ndarray(tensor.get_num_elements(), dtype=cp.complex128, memptr=memptr_obj) return cupy_array_val class QLBMAdvecDiffD2Q5_new: def __init__(self, ux=0.2, uy=0.15) -> None: self.dim = 2 self.ndir = 5 self.nq_dir = math.ceil(np.log2(self.ndir)) self.dirs = [] for dir_int in range(self.ndir): dir_bin = f"{dir_int:b}".zfill(self.nq_dir) self.dirs.append(dir_bin) self.e_unitvec = np.array([0, 1, -1, 1, -1]) self.wts = np.array([2/6, 1/6, 1/6, 1/6, 1/6]) self.cs = 1 / np.sqrt(3) self.ux = ux self.uy = uy self.u = np.array([0, self.ux, self.ux, self.uy, self.uy]) self.wtcoeffs = np.multiply(self.wts, 1 + self.e_unitvec * self.u / self.cs**2) self.create_circuit() def create_circuit(self): v = np.pad(self.wtcoeffs, (0, 2**num_anc - self.ndir)) v = v**0.5 v[0] += 1 # This was missing in your original code v = v / np.linalg.norm(v) U_prep = 2 * np.outer(v, v) - np.eye(len(v)) cudaq.register_operation("prep_op", U_prep) def collisionOp(dirs_list): dirs_i_list_val = [] for dir_str in dirs_list: dirs_i = [int(c) for c in dir_str] dirs_i_list_val += dirs_i[::-1] return dirs_i_list_val self.dirs_i_list = collisionOp(self.dirs) @cudaq.kernel def rshift(q: cudaq.qview, n: int): for i in range(n): if i == n - 1: x(q[n - 1 - i]) elif i == n - 2: x.ctrl(q[n - 1 - (i + 1)], q[n - 1 - i]) else: x.ctrl(q[0:n - 1 - i], q[n - 1 - i]) @cudaq.kernel def lshift(q: cudaq.qview, n: int): for i in range(n): if i == 0: x(q[0]) elif i == 1: x.ctrl(q[0], q[1]) else: x.ctrl(q[0:i], q[i]) @cudaq.kernel def d2q5_tstep(q: cudaq.qview, nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]): qx = q[0:nqx] qy = q[nqx:nqx + nqy] qdir = q[nqx + nqy:nqx + nqy + nq_dir_val] idx_lqx = 2 b_list = dirs_i_val[idx_lqx * nq_dir_val:(idx_lqx + 1) * nq_dir_val] for j in range(nq_dir_val): if b_list[j] == 0: x(qdir[j]) cudaq.control(lshift, qdir, qx, nqx) for j in range(nq_dir_val): if b_list[j] == 0: x(qdir[j]) idx_rqx = 1 b_list = dirs_i_val[idx_rqx * nq_dir_val:(idx_rqx + 1) * nq_dir_val] for j in range(nq_dir_val): if b_list[j] == 0: x(qdir[j]) cudaq.control(rshift, qdir, qx, nqx) for j in range(nq_dir_val): if b_list[j] == 0: x(qdir[j]) idx_lqy = 4 b_list = dirs_i_val[idx_lqy * nq_dir_val:(idx_lqy + 1) * nq_dir_val] for j in range(nq_dir_val): if b_list[j] == 0: x(qdir[j]) cudaq.control(lshift, qdir, qy, nqy) for j in range(nq_dir_val): if b_list[j] == 0: x(qdir[j]) idx_rqy = 3 b_list = dirs_i_val[idx_rqy * nq_dir_val:(idx_rqy + 1) * nq_dir_val] for j in range(nq_dir_val): if b_list[j] == 0: x(qdir[j]) cudaq.control(rshift, qdir, qy, nqy) for j in range(nq_dir_val): if b_list[j] == 0: x(qdir[j]) @cudaq.kernel def d2q5_tstep_wrapper(state_arg: cudaq.State, nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]): q = cudaq.qvector(state_arg) qdir = q[nqx + nqy:nqx + nqy + nq_dir_val] prep_op(qdir[2], qdir[1], qdir[0]) # Uncommented d2q5_tstep(q, nqx, nqy, nq_dir_val, dirs_i_val) prep_op(qdir[2], qdir[1], qdir[0]) # Uncommented @cudaq.kernel def d2q5_tstep_wrapper_hadamard(vec_arg: list[complex], nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]): q = cudaq.qvector(vec_arg) qdir = q[nqx + nqy:nqx + nqy + nq_dir_val] qy = q[nqx:nqx + nqy] prep_op(qdir[2], qdir[1], qdir[0]) # Uncommented d2q5_tstep(q, nqx, nqy, nq_dir_val, dirs_i_val) prep_op(qdir[2], qdir[1], qdir[0]) # Uncommented for i in range(nqy): h(qy[i]) def run_timestep_func(vec_arg, hadamard=False): if hadamard: result = cudaq.get_state(d2q5_tstep_wrapper_hadamard, vec_arg, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list) else: result = cudaq.get_state(d2q5_tstep_wrapper, vec_arg, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list) num_nonzero_ranks = num_ranks / (2**num_anc) rank_slice_cupy = to_cupy_array(result) if rank >= num_nonzero_ranks and num_nonzero_ranks > 0: sub_sv_zeros = np.zeros(N_sub_per_rank, dtype=np.complex128) cp.cuda.runtime.memcpy(rank_slice_cupy.data.ptr, sub_sv_zeros.ctypes.data, sub_sv_zeros.nbytes, cp.cuda.runtime.memcpyHostToDevice) if rank == 0 and num_nonzero_ranks < 1 and N_sub_per_rank > 0: limit_idx = int(N_tot_state_vector / (2**num_anc)) if limit_idx < rank_slice_cupy.size: rank_slice_cupy[limit_idx:] = 0 return result self.run_timestep = run_timestep_func def write_state(self, state_to_write, t_step_str_val): rank_slice_cupy = to_cupy_array(state_to_write) num_nonzero_ranks = num_ranks / (2**num_anc) if rank < num_nonzero_ranks or (rank == 0 and num_nonzero_ranks <= 0): save_path = intermediate_folder_path / f"{t_step_str_val}_{rank}.npy" with open(save_path, 'wb') as f: arr_to_save = None data_limit = N_sub_per_rank if num_nonzero_ranks < 1 and rank == 0: data_limit = int(N_tot_state_vector / (2**num_anc)) if data_limit > 0: relevant_part_cupy = cp.real(rank_slice_cupy[:data_limit]) else: relevant_part_cupy = cp.array([], dtype=cp.float64) if relevant_part_cupy.size >= current_N * current_N: arr_flat = relevant_part_cupy[:current_N * current_N] if downsampling_factor > 1 and current_N > 0: arr_reshaped = arr_flat.reshape((current_N, current_N)) arr_downsampled = arr_reshaped[::downsampling_factor, ::downsampling_factor] arr_to_save = arr_downsampled.flatten() else: arr_to_save = arr_flat elif relevant_part_cupy.size > 0: if downsampling_factor > 1: arr_to_save = relevant_part_cupy[::downsampling_factor] else: arr_to_save = relevant_part_cupy if arr_to_save is not None and arr_to_save.size > 0: np.save(f, arr_to_save.get() if isinstance(arr_to_save, cp.ndarray) else arr_to_save) def run_evolution(self, initial_state_arg, total_timesteps, observable=False): current_state_val = initial_state_arg self.write_state(current_state_val, '0') # Save initial state # Save data at regular intervals for better slider functionality save_interval = max(1, total_timesteps // 10) # Save every 10% of simulation for t_iter in range(total_timesteps): next_state_val = None if t_iter == total_timesteps - 1 and observable: next_state_val = self.run_timestep(current_state_val, True) self.write_state(next_state_val, str(t_iter + 1) + "_h") else: next_state_val = self.run_timestep(current_state_val) # Save at regular intervals AND at specific timesteps if (t_iter + 1) % save_interval == 0 or t_iter + 1 in [total_timesteps//4, 3*total_timesteps//4, total_timesteps]: self.write_state(next_state_val, str(t_iter + 1)) if rank == 0 and (t_iter + 1) % 10 == 0: print(f"Timestep: {t_iter + 1}/{total_timesteps}") cp.get_default_memory_pool().free_all_blocks() current_state_val = next_state_val if rank == 0: print(f"Timestep: {total_timesteps}/{total_timesteps} (Evolution complete)") cp.get_default_memory_pool().free_all_blocks() self.final_state = current_state_val downsampling_factor = 2**5 if current_N == 0: print("Error: current_N is zero. num_reg_qubits likely too small.") return None if current_N < downsampling_factor: downsampling_factor = current_N if current_N > 0 else 1 qlbm_obj = QLBMAdvecDiffD2Q5_new(ux=ux_input, uy=uy_input) initial_state_val = cudaq.get_state(alloc_kernel, num_qubits_total) xv_init = np.arange(current_N) yv_init = np.arange(current_N) initial_grid_2d_X, initial_grid_2d_Y = np.meshgrid(xv_init, yv_init) if distribution_type == "Random": initial_grid_2d = selected_initial_state_function_raw(current_N, current_N, current_N) else: initial_grid_2d = initial_state_func_eval(initial_grid_2d_X, initial_grid_2d_Y) sub_sv_init_flat = initial_grid_2d.flatten().astype(np.complex128) full_initial_sv_host = np.zeros(N_sub_per_rank, dtype=np.complex128) num_computational_states = current_N * current_N if len(sub_sv_init_flat) == num_computational_states: if num_computational_states <= N_sub_per_rank: full_initial_sv_host[:num_computational_states] = sub_sv_init_flat else: print(f"Error: Grid data {num_computational_states} > N_sub_per_rank {N_sub_per_rank}") return None else: print(f"Warning: Initial state size {len(sub_sv_init_flat)} != expected {num_computational_states}") fill_len = min(len(sub_sv_init_flat), num_computational_states, N_sub_per_rank) full_initial_sv_host[:fill_len] = sub_sv_init_flat[:fill_len] rank_slice_init = to_cupy_array(initial_state_val) print(f'Rank {rank}: Initializing state with {distribution_type} (ux={ux_input}, uy={uy_input})...') cp.cuda.runtime.memcpy(rank_slice_init.data.ptr, full_initial_sv_host.ctypes.data, full_initial_sv_host.nbytes, cp.cuda.runtime.memcpyHostToDevice) print(f'Rank {rank}: Initial state copied. Size: {len(sub_sv_init_flat)}. N_sub_per_rank: {N_sub_per_rank}') print("Starting QLBM evolution...") qlbm_obj.run_evolution(initial_state_val, T) print("QLBM evolution complete.") print("Generating plots with Plotly...") downsampled_N = current_N // downsampling_factor if downsampled_N == 0 and current_N > 0: downsampled_N = 1 elif current_N == 0: print("Error: current_N is zero before Plotly stage.") return None # Load more timesteps for better slider experience time_steps_to_load = list(range(0, T+1, max(1, T//10))) + [T] # 10 steps plus final time_steps_to_load = sorted(list(set(time_steps_to_load))) # Remove duplicates and sort data_frames = [] actual_timesteps_loaded = [] for t in time_steps_to_load: file_path = intermediate_folder_path / f"{t}_{rank}.npy" if file_path.exists(): sol_loaded = np.load(file_path) if sol_loaded.size == downsampled_N * downsampled_N: Z_data = np.reshape(sol_loaded, (downsampled_N, downsampled_N)) data_frames.append(Z_data) actual_timesteps_loaded.append(t) else: print(f"Warning: File {file_path} size {sol_loaded.size} != expected {downsampled_N*downsampled_N}. Skipping.") else: print(f"Warning: File {file_path} not found. Skipping.") if not data_frames: print("Error: No data frames loaded for interactive plot.") return None x_coords_plot = np.linspace(-10, 10, downsampled_N) y_coords_plot = np.linspace(-10, 10, downsampled_N) # Calculate global min/max for consistent scaling z_min = min([np.min(Z) for Z in data_frames]) z_max = max([np.max(Z) for Z in data_frames]) if z_max == z_min: z_max += 1e-9 # Create interactive Plotly figure with slider fig = go.Figure() # Add all traces (one for each timestep) for i, Z in enumerate(data_frames): fig.add_trace( go.Surface( z=Z, x=x_coords_plot, y=y_coords_plot, colorscale='Viridis', cmin=z_min, cmax=z_max, name=f'Time: {actual_timesteps_loaded[i]}', visible=(i == 0), # Only first trace visible initially showscale=(i == 0) # Only show colorbar for first trace ) ) # Create slider steps correctly steps = [] for i in range(len(data_frames)): step = dict( method="update", args=[{"visible": [False] * len(data_frames)}], label=f"Time: {actual_timesteps_loaded[i]}" ) step["args"][0]["visible"][i] = True # Make only the i-th trace visible steps.append(step) # Configure sliders properly sliders = [dict( active=0, currentvalue={"prefix": "Time: "}, pad={"t": 50}, steps=steps )] fig.update_layout( title='QLBM Simulation - Density Evolution', scene=dict( xaxis_title='X', yaxis_title='Y', zaxis_title='Density', xaxis=dict(range=[x_coords_plot[0], x_coords_plot[-1]]), yaxis=dict(range=[y_coords_plot[0], y_coords_plot[-1]]), zaxis=dict(range=[z_min, z_max]), ), sliders=sliders, width=1000, height=900 ) return fig # Gradio Interface Definition def qlbm_gradio_interface(num_reg_qubits_input: int, timescale_input: int, distribution_type_param: str, ux_param: float, uy_param: float, velocity_field_type_param: str): num_reg_qubits_val = int(num_reg_qubits_input) timescale_val = int(timescale_input) ux_val = float(ux_param) uy_val = float(uy_param) print(f"Gradio Interface: num_reg_qubits={num_reg_qubits_val}, T={timescale_val}, Distribution={distribution_type_param}, ux={ux_val}, uy={uy_val}, VelocityFieldType={velocity_field_type_param}") plot_fig = simulate_qlbm_and_animate( num_reg_qubits=num_reg_qubits_val, T=timescale_val, distribution_type=distribution_type_param, ux_input=ux_val, uy_input=uy_val, velocity_field_type=velocity_field_type_param ) if plot_fig is None: gr.Warning("Simulation or plotting failed. Please check console for errors.") return None return plot_fig with gr.Blocks(theme=gr.themes.Soft(), title="QLBM Simulation with Plotly") as qlbm_demo: gr.Markdown( """ # ⚛️ Quantum Lattice Boltzmann Method (QLBM) Simulator (Plotly Animation) Welcome to the Quantum Lattice Boltzmann Method (QLBM) simulator! This version uses Plotly for 3D animation and interactive plots. **How this Simulator Works:** This simulator implements a D2Q5 model on a quantum computer simulator (CUDA-Q). - Control grid size (via Number of Register Qubits: $N=2^{\text{num_reg_qubits}}$). - Set total simulation time (Timescale T). - Choose initial distribution. - Set advection velocities `ux` and `uy`. The simulation generates an interactive Plotly figure with a slider for selected time steps. **Note:** Higher qubit counts and longer timescales are computationally intensive. Advection velocities should be small (e.g., < 0.3). The Plotly figure allows interactive exploration of specific time steps. """ ) with gr.Row(): with gr.Column(scale=1): gr.Markdown("## Simulation Parameters") num_reg_qubits_slider = gr.Slider( minimum=2, maximum=10, value=8, step=1, label="Number of Register Qubits (num_reg_qubits)", info="Grid N = 2^num_reg_qubits. Max 10 (Note: >8 slow; >9 may hit simulator/memory limits on free tiers)." ) timescale_slider = gr.Slider( minimum=0, maximum=2000, value=100, step=10, label="Timescale (T)", info="Total number of timesteps. Max 2000." ) with gr.Accordion("Initial Conditions", open=True): distribution_options = ["Sine Wave (Original)", "Gaussian", "Random"] distribution_type_input = gr.Radio( choices=distribution_options, value="Sine Wave (Original)", label="Initial Distribution Type", info="Select the initial pattern of the substance." ) with gr.Accordion("Velocity Fields", open=True): velocity_field_options = ["Uniform", "Vortex", "Shear"] velocity_field_type_input = gr.Radio( choices=velocity_field_options, value="Uniform", label="Velocity Field Type", info="Select the type of background velocity field." ) ux_slider = gr.Slider( minimum=-0.4, maximum=0.4, value=0.2, step=0.01, label="Advection Velocity ux", info="x-component of background advection." ) uy_slider = gr.Slider( minimum=-0.4, maximum=0.4, value=0.15, step=0.01, label="Advection Velocity uy", info="y-component of background advection." ) run_qlbm_btn = gr.Button("Run QLBM Simulation", variant="primary") with gr.Column(scale=2): qlbm_interactive_plot = gr.Plot(label="Interactive Density Plot with Slider") qlbm_inputs_list = [num_reg_qubits_slider, timescale_slider, distribution_type_input, ux_slider, uy_slider, velocity_field_type_input] run_qlbm_btn.click( fn=qlbm_gradio_interface, inputs=qlbm_inputs_list, outputs=[qlbm_interactive_plot] ) gr.Examples( examples=[[6, 50, "Gaussian", 0.1, 0.05, "Uniform"]], inputs=qlbm_inputs_list, outputs=[qlbm_interactive_plot], fn=qlbm_gradio_interface, cache_examples=False ) if __name__ == "__main__": try: cudaq.set_target('nvidia', option='fp64') print(f"CUDA-Q Target successfully set to: {cudaq.get_target().name}") except Exception as e_target: print(f"Warning: Could not set CUDA-Q target to 'nvidia'. Error: {e_target}") print(f"Current CUDA-Q Target: {cudaq.get_target().name}. Performance may be affected.") qlbm_demo.launch()