Spaces:
Build error
Build error
| import os | |
| import sys | |
| import time | |
| import math | |
| import threading | |
| import tempfile | |
| import gradio as gr | |
| import cudaq | |
| import numpy as np | |
| import cupy as cp | |
| # from scipy import interpolate # Not used | |
| from pathlib import Path | |
| from matplotlib import pyplot as plt | |
| from matplotlib.animation import FuncAnimation | |
| def simulate_qlbm_and_animate(num_reg_qubits: int, T: int, distribution_type: str, ux_input: float, uy_input: float): | |
| """ | |
| Simulates a 2D advection-diffusion problem using a Quantum Lattice Boltzmann Method (QLBM) | |
| and generates a GIF animation of the simulation's evolution. | |
| Args: | |
| num_reg_qubits (int): The number of register qubits (determines grid size N=2^num_reg_qubits). | |
| T (int): The total number of timesteps to run the simulation. | |
| distribution_type (str): The type of initial distribution to use. | |
| ux_input (float): Advection velocity in the x-direction. | |
| uy_input (float): Advection velocity in the y-direction. | |
| Returns: | |
| str: The file path to the generated GIF animation, or None if an error occurs. | |
| """ | |
| video_length = T | |
| simulation_fps = 0.1 | |
| frames = int(simulation_fps * video_length) | |
| if frames == 0: | |
| frames = 1 | |
| num_anc = 3 | |
| num_qubits_total = 2 * num_reg_qubits + num_anc | |
| current_N = 2**num_reg_qubits | |
| N_tot_state_vector = 2**num_qubits_total | |
| num_ranks = 1 | |
| rank = 0 | |
| N_sub_per_rank = int(N_tot_state_vector // num_ranks) | |
| timesteps_per_frame = 1 | |
| if frames < T and frames > 0: | |
| timesteps_per_frame = int(T / frames) | |
| if timesteps_per_frame == 0: | |
| timesteps_per_frame = 1 | |
| if distribution_type == "Sine Wave (Original)": | |
| selected_initial_state_function_raw = lambda x, y, N_val_func: \ | |
| np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \ | |
| np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1 | |
| elif distribution_type == "Gaussian": | |
| selected_initial_state_function_raw = lambda x, y, N_val_func: \ | |
| np.exp(-((x - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2) + | |
| (y - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2))) * 1.8 + 0.2 | |
| elif distribution_type == "Random": | |
| selected_initial_state_function_raw = lambda x, y, N_val_func: \ | |
| np.random.rand(x.shape[0], x.shape[1]) * 1.5 + 0.2 | |
| else: | |
| print(f"Warning: Unknown distribution type '{distribution_type}'. Defaulting to Sine Wave.") | |
| selected_initial_state_function_raw = lambda x, y, N_val_func: \ | |
| np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \ | |
| np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1 | |
| initial_state_func_eval = lambda x_coords, y_coords: \ | |
| selected_initial_state_function_raw(x_coords, y_coords, current_N) * \ | |
| (y_coords < current_N).astype(int) | |
| with tempfile.TemporaryDirectory() as tmp_npy_dir: | |
| intermediate_folder_path = Path(tmp_npy_dir) | |
| cudaq.set_target('nvidia', option='mgpu,fp64') | |
| def alloc_kernel(num_qubits_alloc: int): | |
| qubits = cudaq.qvector(num_qubits_alloc) | |
| from cupy.cuda.memory import MemoryPointer, UnownedMemory | |
| def to_cupy_array(state): | |
| tensor = state.getTensor() | |
| pDevice = tensor.data() | |
| sizeByte = tensor.get_num_elements() * tensor.get_element_size() | |
| mem = UnownedMemory(pDevice, sizeByte, owner=state) | |
| memptr_obj = MemoryPointer(mem, 0) | |
| cupy_array = cp.ndarray(tensor.get_num_elements(), | |
| dtype=cp.complex128, | |
| memptr=memptr_obj) | |
| return cupy_array | |
| class QLBMAdvecDiffD2Q5_new: | |
| def __init__(self, ux=0.2, uy=0.15) -> None: # ux, uy are now passed here | |
| self.dim = 2 | |
| self.ndir = 5 | |
| self.nq_dir = math.ceil(np.log2(self.ndir)) | |
| self.dirs = [] | |
| for dir_int in range(self.ndir): | |
| dir_bin = f"{dir_int:b}".zfill(self.nq_dir) | |
| self.dirs.append(dir_bin) | |
| self.e_unitvec = np.array([0, 1, -1, 1, -1]) | |
| self.wts = np.array([2/6, 1/6, 1/6, 1/6, 1/6]) | |
| self.cs = 1 / np.sqrt(3) | |
| self.ux = ux # Use passed ux | |
| self.uy = uy # Use passed uy | |
| self.u = np.array([0, self.ux, self.ux, self.uy, self.uy]) | |
| self.wtcoeffs = np.multiply(self.wts, 1 + self.e_unitvec * self.u / self.cs**2) | |
| self.create_circuit() | |
| def create_circuit(self): | |
| v = np.pad(self.wtcoeffs, (0, 2**num_anc - self.ndir)) | |
| v = v**0.5 | |
| v[0] += 1 | |
| v = v / np.linalg.norm(v) | |
| U_prep = 2 * np.outer(v, v) - np.eye(len(v)) | |
| cudaq.register_operation("prep_op", U_prep) | |
| def collisionOp(dirs): | |
| dirs_i_list = [] | |
| for dir_ in dirs: | |
| dirs_i = [(int(c)) for c in dir_] | |
| dirs_i_list += dirs_i[::-1] | |
| return dirs_i_list | |
| self.dirs_i_list = collisionOp(self.dirs) | |
| def rshift(q: cudaq.qview, n: int): | |
| for i in range(n): | |
| if i == n - 1: | |
| x(q[n - 1 - i]) | |
| elif i == n - 2: | |
| x.ctrl(q[n - 1 - (i + 1)], q[n - 1 - i]) | |
| else: | |
| x.ctrl(q[0:n - 1 - i], q[n - 1 - i]) | |
| def lshift(q: cudaq.qview, n: int): | |
| for i in range(n): | |
| if i == 0: | |
| x(q[0]) | |
| elif i == 1: | |
| x.ctrl(q[0], q[1]) | |
| else: | |
| x.ctrl(q[0:i], q[i]) | |
| def d2q5_tstep(q: cudaq.qview, nqx: int, nqy: int, nq_dir: int, dirs_i: list[int]): | |
| qx = q[0:nqx] | |
| qy = q[nqx:nqx + nqy] | |
| qdir = q[nqx + nqy:nqx + nqy + nq_dir] | |
| i = 2 | |
| b_list = dirs_i[i * nq_dir:(i + 1) * nq_dir] | |
| for j in range(nq_dir): | |
| b = b_list[j] | |
| if b == 0: | |
| x(qdir[j]) | |
| cudaq.control(lshift, qdir, qx, nqx) | |
| for j in range(nq_dir): | |
| b = b_list[j] | |
| if b == 0: | |
| x(qdir[j]) | |
| i = 1 | |
| b_list = dirs_i[i * nq_dir:(i + 1) * nq_dir] | |
| for j in range(nq_dir): | |
| b = b_list[j] | |
| if b == 0: | |
| x(qdir[j]) | |
| cudaq.control(rshift, qdir, qx, nqx) | |
| for j in range(nq_dir): | |
| b = b_list[j] | |
| if b == 0: | |
| x(qdir[j]) | |
| i = 4 | |
| b_list = dirs_i[i * nq_dir:(i + 1) * nq_dir] | |
| for j in range(nq_dir): | |
| b = b_list[j] | |
| if b == 0: | |
| x(qdir[j]) | |
| cudaq.control(lshift, qdir, qy, nqy) | |
| for j in range(nq_dir): | |
| b = b_list[j] | |
| if b == 0: | |
| x(qdir[j]) | |
| i = 3 | |
| b_list = dirs_i[i * nq_dir:(i + 1) * nq_dir] | |
| for j in range(nq_dir): | |
| b = b_list[j] | |
| if b == 0: | |
| x(qdir[j]) | |
| cudaq.control(rshift, qdir, qy, nqy) | |
| for j in range(nq_dir): | |
| b = b_list[j] | |
| if b == 0: | |
| x(qdir[j]) | |
| def d2q5_tstep_wrapper(state_arg: cudaq.State, nqx: int, nqy: int, nq_dir: int, dirs_i: list[int]): | |
| q = cudaq.qvector(state_arg) | |
| qdir = q[nqx + nqy:nqx + nqy + nq_dir] | |
| prep_op(qdir[2], qdir[1], qdir[0]) | |
| d2q5_tstep(q, nqx, nqy, nq_dir, dirs_i) | |
| prep_op(qdir[2], qdir[1], qdir[0]) | |
| def d2q5_tstep_wrapper_hadamard(vec: list[complex], nqx: int, nqy: int, nq_dir: int, dirs_i: list[int]): | |
| q = cudaq.qvector(vec) | |
| qdir = q[nqx + nqy:nqx + nqy + nq_dir] | |
| qy = q[nqx:nqx + nqy] | |
| prep_op(qdir[2], qdir[1], qdir[0]) | |
| d2q5_tstep(q, nqx, nqy, nq_dir, dirs_i) | |
| prep_op(qdir[2], qdir[1], qdir[0]) | |
| for i in range(nqy): | |
| h(qy[i]) | |
| def run_timestep_func(vec, hadamard=False): | |
| if hadamard: | |
| result = cudaq.get_state(d2q5_tstep_wrapper_hadamard, vec, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list) | |
| else: | |
| result = cudaq.get_state(d2q5_tstep_wrapper, vec, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list) | |
| num_nonzero_ranks = num_ranks / (2**num_anc) | |
| rank_slice_cupy = to_cupy_array(result) | |
| if rank >= num_nonzero_ranks: | |
| sub_sv_zeros = np.zeros(N_sub_per_rank, dtype=np.complex128) | |
| cp.cuda.runtime.memcpy(rank_slice_cupy.data.ptr, sub_sv_zeros.ctypes.data, sub_sv_zeros.nbytes, cp.cuda.runtime.memcpyHostToDevice) | |
| if rank == 0 and num_nonzero_ranks < 1: | |
| sub_sv_get = (rank_slice_cupy).get() | |
| sub_sv_get[int(N_tot_state_vector / (2**num_anc)):] = 0 | |
| cp.cuda.runtime.memcpy(rank_slice_cupy.data.ptr, sub_sv_get.ctypes.data, sub_sv_get.nbytes, cp.cuda.runtime.memcpyHostToDevice) | |
| return result | |
| self.run_timestep = run_timestep_func | |
| def write_state(self, state_to_write, t_step): | |
| rank_slice_cupy = to_cupy_array(state_to_write) | |
| num_nonzero_ranks = num_ranks / (2**num_anc) | |
| if rank < num_nonzero_ranks: | |
| save_path = intermediate_folder_path / f"{t_step}_{rank}.npy" | |
| with open(save_path, 'wb') as f: | |
| arr_to_save = None | |
| if num_nonzero_ranks < 1: | |
| arr_to_save = cp.real(rank_slice_cupy)[:int(N_tot_state_vector / (2**num_anc))] | |
| else: | |
| arr_to_save = cp.real(rank_slice_cupy) | |
| if len(arr_to_save) > current_N : | |
| arr_to_save=arr_to_save.reshape((current_N, current_N)) | |
| arr_to_save=arr_to_save[::downsampling_factor,::downsampling_factor] | |
| arr_to_save=arr_to_save.flatten() | |
| elif len(arr_to_save) > 0 : | |
| arr_to_save=arr_to_save[::downsampling_factor] | |
| np.save(f, arr_to_save.get() if isinstance(arr_to_save, cp.ndarray) else arr_to_save) | |
| def run_evolution(self, initial_state_arg, total_timesteps, observable=False): | |
| current_state = initial_state_arg | |
| for t_iter in range(total_timesteps): | |
| next_state = None | |
| if t_iter == total_timesteps - 1 and observable: | |
| next_state = self.run_timestep(current_state, True) | |
| self.write_state(next_state, str(t_iter + 1) + "_h") | |
| else: | |
| next_state = self.run_timestep(current_state) | |
| if (t_iter + 1) % timesteps_per_frame == 0: | |
| self.write_state(next_state, t_iter + 1) | |
| if rank == 0: | |
| print(f"Timestep: {t_iter + 1}/{total_timesteps}") | |
| cp.get_default_memory_pool().free_all_blocks() | |
| current_state = next_state | |
| last_saved_timestep_name = str(total_timesteps) if not observable else str(total_timesteps) + "_h" | |
| # Check if the very last state (T or T_h) was potentially saved by the loop or observable condition. | |
| # If not, save it. | |
| final_state_file_path = intermediate_folder_path.joinpath(f"{last_saved_timestep_name}_{rank}.npy") | |
| # Also check if the non-observable final T state was saved if T aligns with timesteps_per_frame | |
| final_T_state_file_path = intermediate_folder_path.joinpath(f"{total_timesteps}_{rank}.npy") | |
| needs_final_save = True | |
| if observable and final_state_file_path.exists(): # T_h was saved | |
| needs_final_save = False | |
| elif not observable and final_T_state_file_path.exists() and total_timesteps % timesteps_per_frame == 0 : # T was saved by loop | |
| needs_final_save = False | |
| elif not observable and final_T_state_file_path.exists(): # T was saved by a previous final write | |
| needs_final_save = False | |
| if needs_final_save and total_timesteps > 0: | |
| if not observable: # Save as T_0.npy | |
| self.write_state(current_state, total_timesteps) | |
| # If observable, it should have been saved as T_h. If it wasn't (e.g. T=0), this won't run. | |
| if rank == 0: | |
| print(f"Timestep: {total_timesteps}/{total_timesteps} (Evolution complete)") | |
| cp.get_default_memory_pool().free_all_blocks() | |
| self.final_state = current_state | |
| downsampling_factor = 2**5 | |
| # Pass ux_input and uy_input to the constructor | |
| qlbm_obj = QLBMAdvecDiffD2Q5_new(ux=ux_input, uy=uy_input) | |
| initial_state = cudaq.get_state(alloc_kernel, num_qubits_total) | |
| rank_slice_init = to_cupy_array(initial_state) | |
| xv_init = np.arange(current_N) | |
| yv_init = np.arange(current_N) | |
| print(f'Start initializing state with {distribution_type} distribution (ux={ux_input}, uy={uy_input})...') | |
| initial_grid_2d = initial_state_func_eval(*np.meshgrid(xv_init, yv_init)) | |
| sub_sv_init_flat = initial_grid_2d.flatten().astype(np.complex128) | |
| full_initial_sv_host = np.zeros(N_sub_per_rank, dtype=np.complex128) | |
| num_computational_states = current_N * current_N | |
| if len(sub_sv_init_flat) == num_computational_states: | |
| full_initial_sv_host[:num_computational_states] = sub_sv_init_flat | |
| else: | |
| print(f"Warning: Mismatch in expected initial state size {num_computational_states} and calculated {len(sub_sv_init_flat)}") | |
| full_initial_sv_host[:len(sub_sv_init_flat)] = sub_sv_init_flat | |
| print(f'Rank {rank}: Initial state (N*N part) prepared. Size: {len(sub_sv_init_flat)}. Total N_sub_per_rank: {N_sub_per_rank}') | |
| cp.cuda.runtime.memcpy(rank_slice_init.data.ptr, full_initial_sv_host.ctypes.data, full_initial_sv_host.nbytes, cp.cuda.runtime.memcpyHostToDevice) | |
| print(f'Rank {rank}: Initial state copied to device.') | |
| print("Starting QLBM evolution...") | |
| qlbm_obj.run_evolution(initial_state, T) | |
| print("QLBM evolution complete.") | |
| print("Generating animation...") | |
| downsampled_N = current_N // downsampling_factor | |
| if downsampled_N == 0: | |
| print("Error: Downsampled grid size is zero. Check num_reg_qubits and downsampling_factor.") | |
| downsampled_N = 1 | |
| plotted_timesteps_str = [] # To store frame names (can include '_h') | |
| if timesteps_per_frame > 0: | |
| # Add regular frames saved by the loop | |
| for t_step in range(timesteps_per_frame, T + 1, timesteps_per_frame): | |
| if intermediate_folder_path.joinpath(f"{t_step}_0.npy").exists(): | |
| plotted_timesteps_str.append(str(t_step)) | |
| # Ensure the very last timestep (T or T_h) is considered for plotting | |
| final_T_path = intermediate_folder_path.joinpath(f"{T}_0.npy") | |
| final_Th_path = intermediate_folder_path.joinpath(f"{T}_h_0.npy") | |
| if T > 0: | |
| if final_Th_path.exists(): # Prefer T_h if it exists | |
| if str(T)+"_h" not in plotted_timesteps_str: | |
| plotted_timesteps_str.append(str(T)+"_h") | |
| elif final_T_path.exists(): # Else, use T if it exists | |
| if str(T) not in plotted_timesteps_str: | |
| plotted_timesteps_str.append(str(T)) | |
| # Remove duplicates and sort (handle numeric and '_h' strings appropriately for sorting if needed, though string sort is often fine here) | |
| plotted_timesteps_str = sorted(list(set(plotted_timesteps_str)), key=lambda k: (int(str(k).replace('_h','')), str(k))) | |
| if not plotted_timesteps_str and T > 0 : # Fallback if T is small and not caught by loop | |
| if final_Th_path.exists(): plotted_timesteps_str = [str(T)+"_h"] | |
| elif final_T_path.exists(): plotted_timesteps_str = [str(T)] | |
| elif not plotted_timesteps_str and T == 0: | |
| print("Warning: T=0, no frames to plot for animation.") | |
| return None | |
| data = [] | |
| actual_timesteps_for_title = [] | |
| for i_str in plotted_timesteps_str: | |
| try: | |
| file_path = intermediate_folder_path / f"{i_str}_0.npy" | |
| sol = np.load(file_path) | |
| if sol.size == downsampled_N * downsampled_N: | |
| Z = np.reshape(sol, (downsampled_N, downsampled_N)) | |
| data.append(Z) | |
| actual_timesteps_for_title.append(str(i_str).replace('_h','')) | |
| else: | |
| print(f"Warning: File {file_path} has unexpected size {sol.size}. Expected {downsampled_N*downsampled_N}. Skipping this frame.") | |
| continue | |
| except FileNotFoundError: | |
| print(f"Warning: File not found for timestep {i_str} ({file_path}). Skipping this frame.") | |
| continue | |
| except Exception as e: | |
| print(f"Error loading or reshaping file for timestep {i_str}: {e}. Skipping this frame.") | |
| continue | |
| if not data: | |
| print("Error: No data available to create animation. Check .npy file generation and paths.") | |
| return None | |
| fig = plt.figure(figsize=(10, 8)) | |
| ax = fig.add_subplot(111, projection='3d') | |
| x_plot = np.linspace(-10, 10, downsampled_N) | |
| y_plot = np.linspace(-10, 10, downsampled_N) | |
| X_plot, Y_plot = np.meshgrid(x_plot, y_plot) | |
| norm_factor = np.linalg.norm(data[0]) if data[0].size > 0 else 1.0 | |
| if norm_factor == 0: | |
| norm_factor = 1.0 | |
| def update_frame(frame_idx): | |
| ax.clear() | |
| Z = data[frame_idx] | |
| surf = ax.plot_surface(X_plot, Y_plot, Z / norm_factor, cmap='viridis', linewidth=0, antialiased=False) | |
| current_timestep_for_title = actual_timesteps_for_title[frame_idx] if frame_idx < len(actual_timesteps_for_title) else "Unknown" | |
| ax.set_title(f'Quantum Simulation Evolution (Timestep: {current_timestep_for_title})') | |
| ax.set_xlabel('x') | |
| ax.set_ylabel('y') | |
| ax.set_zlim(0, 0.6) | |
| return surf, | |
| gif_animation_fps = 10 | |
| ani = FuncAnimation(fig, update_frame, frames=len(data), blit=False) | |
| results_base_dir = "Results" | |
| gif_frames_for_naming = int(simulation_fps * T) | |
| if gif_frames_for_naming == 0: gif_frames_for_naming = 1 | |
| # Sanitize distribution_type for filename | |
| dist_name_part = distribution_type.replace(' ','').replace('(Original)','').replace('(','').replace(')','') | |
| specific_folder_name = (f"d2q5_nq{current_N}x{current_N}_T{T}_fr{gif_frames_for_naming}_" | |
| f"dist{dist_name_part}_ux{ux_input:.2f}_uy{uy_input:.2f}") | |
| final_output_dir = Path(results_base_dir) / specific_folder_name | |
| os.makedirs(final_output_dir, exist_ok=True) | |
| gif_path_to_return = final_output_dir / "animation.gif" | |
| try: | |
| ani.save(str(gif_path_to_return), writer='pillow', fps=gif_animation_fps) | |
| print(f"Animation saved to {gif_path_to_return}") | |
| except Exception as e: | |
| print(f"Error saving animation: {e}") | |
| plt.close(fig) | |
| return None | |
| plt.close(fig) | |
| return str(gif_path_to_return) | |
| # --- Gradio Interface Definition --- | |
| def qlbm_gradio_interface(num_reg_qubits_input: int, timescale_input: int, distribution_type_param: str, ux_param: float, uy_param: float): | |
| num_reg_qubits_val = int(num_reg_qubits_input) | |
| timescale_val = int(timescale_input) | |
| ux_val = float(ux_param) | |
| uy_val = float(uy_param) | |
| print(f"Gradio Interface: num_reg_qubits={num_reg_qubits_val}, T={timescale_val}, Distribution={distribution_type_param}, ux={ux_val}, uy={uy_val}") | |
| gif_path_output = simulate_qlbm_and_animate( | |
| num_reg_qubits=num_reg_qubits_val, | |
| T=timescale_val, | |
| distribution_type=distribution_type_param, | |
| ux_input=ux_val, | |
| uy_input=uy_val | |
| ) | |
| if gif_path_output is None: | |
| gr.Warning("Animation generation failed. Please check console for errors.") | |
| return None | |
| return gif_path_output | |
| with gr.Blocks(theme=gr.themes.Soft(), title="QLBM Simulation") as qlbm_demo: | |
| gr.Markdown( | |
| """ | |
| # ⚛️ Quantum Lattice Boltzmann Method (QLBM) Simulator | |
| Welcome to the Quantum Lattice Boltzmann Method (QLBM) simulator! | |
| **What is QLBM?** | |
| The Lattice Boltzmann Method (LBM) is a computational fluid dynamics technique for simulating fluid flow. QLBM is its quantum counterpart, leveraging quantum algorithms to potentially offer advantages for certain types of simulations. It's used here to model advection-diffusion phenomena, which describe how substances are transported due to bulk motion (advection) and spread out due to random motion (diffusion). | |
| **How this Simulator Works:** | |
| This simulator implements a D2Q5 model (2 dimensions, 5 velocity directions) on a quantum computer simulator (using CUDA-Q). | |
| - You can control the grid size (via Number of Register Qubits, where grid size is $2^N \\times 2^N$). | |
| - You can set the total simulation time (Timescale T). | |
| - You can choose the initial distribution of the substance on the grid. | |
| - You can set the advection velocities `ux` (x-direction) and `uy` (y-direction). | |
| The simulation will then evolve this initial state over time, and an animation of the substance's density will be generated. | |
| **Note:** Simulations with higher qubit counts and longer timescales can be computationally intensive and may take a significant amount of time to complete. | |
| The $N$ in $2^N \\times 2^N$ grid refers to `num_reg_qubits`. Advection velocities `ux` and `uy` should generally be kept small (e.g., < 0.3) for model stability. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("## Simulation Parameters") | |
| num_reg_qubits_slider = gr.Slider( | |
| minimum=2, | |
| maximum=11, | |
| value=8, | |
| step=1, | |
| label="Number of Register Qubits (num_reg_qubits)", | |
| info="Determines the N for grid size (2^N x 2^N). Max 11 (Note: >8 can be very slow)." | |
| ) | |
| timescale_slider = gr.Slider( | |
| minimum=10, | |
| maximum=2000, | |
| value=100, | |
| step=10, | |
| label="Timescale (T)", | |
| info="Total number of timesteps. Max 2000 (Note: higher values increase run time significantly)." | |
| ) | |
| distribution_options = ["Sine Wave (Original)", "Gaussian", "Random"] | |
| distribution_type_input = gr.Radio( | |
| choices=distribution_options, | |
| value="Sine Wave (Original)", | |
| label="Initial Distribution Type", | |
| info="Select the initial pattern of the substance on the grid." | |
| ) | |
| ux_slider = gr.Slider( | |
| minimum=-0.4, | |
| maximum=0.4, | |
| value=0.2, | |
| step=0.01, | |
| label="Advection Velocity ux", | |
| info="x-component of the background advection velocity." | |
| ) | |
| uy_slider = gr.Slider( | |
| minimum=-0.4, | |
| maximum=0.4, | |
| value=0.15, | |
| step=0.01, | |
| label="Advection Velocity uy", | |
| info="y-component of the background advection velocity." | |
| ) | |
| run_qlbm_btn = gr.Button("Run QLBM Simulation", variant="primary") | |
| with gr.Column(scale=3): | |
| qlbm_plot_output = gr.Image(label="QLBM Simulation Animation", type="filepath") | |
| qlbm_inputs_list = [num_reg_qubits_slider, timescale_slider, distribution_type_input, ux_slider, uy_slider] | |
| run_qlbm_btn.click( | |
| fn=qlbm_gradio_interface, | |
| inputs=qlbm_inputs_list, | |
| outputs=qlbm_plot_output | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| [8, 100, "Sine Wave (Original)", 0.2, 0.15], | |
| [6, 50, "Gaussian", 0.1, 0.05], | |
| [4, 30, "Random", -0.05, 0.1], | |
| [7, 70, "Sine Wave (Original)", 0.0, 0.0] # Example with no advection | |
| ], | |
| inputs=qlbm_inputs_list, | |
| outputs=qlbm_plot_output, | |
| fn=qlbm_gradio_interface, | |
| cache_examples=False | |
| ) | |
| if __name__ == "__main__": | |
| qlbm_demo.launch() |