Spaces:
Sleeping
Sleeping
| import os | |
| import math | |
| import tempfile | |
| import gradio as gr | |
| import cudaq | |
| import numpy as np | |
| import cupy as cp | |
| from pathlib import Path | |
| # --- Plotly Imports --- | |
| import plotly.graph_objects as go | |
| import plotly.io as pio | |
| import imageio # For compiling GIF from frames | |
| # Set a default Plotly engine for image export | |
| try: | |
| pio.kaleido.scope.mathjax = None # Can speed up Kaleido if MathJax not needed | |
| except AttributeError: | |
| pass | |
| def simulate_qlbm_and_animate(num_reg_qubits: int, T: int, distribution_type: str, ux_input: float, uy_input: float): | |
| """ | |
| Simulates a 2D advection-diffusion problem using a Quantum Lattice Boltzmann Method (QLBM) | |
| and generates a GIF animation of the simulation's evolution using Plotly. | |
| """ | |
| video_length = T | |
| simulation_fps = 0.1 | |
| frames = int(simulation_fps * video_length) | |
| if frames == 0: | |
| frames = 1 | |
| num_anc = 3 | |
| num_qubits_total = 2 * num_reg_qubits + num_anc | |
| current_N = 2**num_reg_qubits | |
| N_tot_state_vector = 2**num_qubits_total | |
| num_ranks = 1 | |
| rank = 0 | |
| N_sub_per_rank = int(N_tot_state_vector // num_ranks) | |
| timesteps_per_frame = 1 | |
| if frames < T and frames > 0: | |
| timesteps_per_frame = int(T / frames) | |
| if timesteps_per_frame == 0: | |
| timesteps_per_frame = 1 | |
| if distribution_type == "Sine Wave (Original)": | |
| selected_initial_state_function_raw = lambda x, y, N_val_func: \ | |
| np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \ | |
| np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1 | |
| elif distribution_type == "Gaussian": | |
| selected_initial_state_function_raw = lambda x, y, N_val_func: \ | |
| np.exp(-((x - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2) + | |
| (y - N_val_func / 2)**2 / (2 * (N_val_func / 5)**2))) * 1.8 + 0.2 | |
| elif distribution_type == "Random": | |
| selected_initial_state_function_raw = lambda x, y, N_val_func: \ | |
| np.random.rand(N_val_func, N_val_func) * 1.5 + 0.2 if isinstance(x, int) else \ | |
| np.random.rand(x.shape[0], x.shape[1]) * 1.5 + 0.2 | |
| else: | |
| print(f"Warning: Unknown distribution type '{distribution_type}'. Defaulting to Sine Wave.") | |
| selected_initial_state_function_raw = lambda x, y, N_val_func: \ | |
| np.sin(x * 2 * np.pi / N_val_func) * (1 - 0.5 * x / N_val_func) * \ | |
| np.sin(y * 4 * np.pi / N_val_func) * (1 - 0.5 * y / N_val_func) + 1 | |
| initial_state_func_eval = lambda x_coords, y_coords: \ | |
| selected_initial_state_function_raw(x_coords, y_coords, current_N) * \ | |
| (y_coords < current_N).astype(int) | |
| with tempfile.TemporaryDirectory() as tmp_npy_dir: | |
| intermediate_folder_path = Path(tmp_npy_dir) | |
| cudaq.set_target('nvidia', option='fp64') | |
| def alloc_kernel(num_qubits_alloc: int): | |
| qubits = cudaq.qvector(num_qubits_alloc) | |
| from cupy.cuda.memory import MemoryPointer, UnownedMemory | |
| def to_cupy_array(state): | |
| tensor = state.getTensor() | |
| pDevice = tensor.data() | |
| sizeByte = tensor.get_num_elements() * tensor.get_element_size() | |
| mem = UnownedMemory(pDevice, sizeByte, owner=state) | |
| memptr_obj = MemoryPointer(mem, 0) | |
| cupy_array_val = cp.ndarray(tensor.get_num_elements(), | |
| dtype=cp.complex128, | |
| memptr=memptr_obj) | |
| return cupy_array_val | |
| class QLBMAdvecDiffD2Q5_new: | |
| def __init__(self, ux=0.2, uy=0.15) -> None: | |
| self.dim = 2 | |
| self.ndir = 5 | |
| self.nq_dir = math.ceil(np.log2(self.ndir)) | |
| self.dirs = [] | |
| for dir_int in range(self.ndir): | |
| dir_bin = f"{dir_int:b}".zfill(self.nq_dir) | |
| self.dirs.append(dir_bin) | |
| self.e_unitvec = np.array([0, 1, -1, 1, -1]) | |
| self.wts = np.array([2/6, 1/6, 1/6, 1/6, 1/6]) | |
| self.cs = 1 / np.sqrt(3) | |
| self.ux = ux | |
| self.uy = uy | |
| self.u = np.array([0, self.ux, self.ux, self.uy, self.uy]) | |
| self.wtcoeffs = np.multiply(self.wts, 1 + self.e_unitvec * self.u / self.cs**2) | |
| self.create_circuit() | |
| def create_circuit(self): | |
| v = np.pad(self.wtcoeffs, (0, 2**num_anc - self.ndir)) | |
| v = v**0.5 | |
| v[0] += 1 | |
| v = v / np.linalg.norm(v) | |
| U_prep = 2 * np.outer(v, v) - np.eye(len(v)) | |
| cudaq.register_operation("prep_op", U_prep) | |
| def collisionOp(dirs_list): | |
| dirs_i_list_val = [] | |
| for dir_str in dirs_list: | |
| dirs_i = [(int(c)) for c in dir_str] | |
| dirs_i_list_val += dirs_i[::-1] | |
| return dirs_i_list_val | |
| self.dirs_i_list = collisionOp(self.dirs) | |
| def rshift(q: cudaq.qview, n: int): | |
| for i in range(n): | |
| if i == n - 1: | |
| x(q[n - 1 - i]) | |
| elif i == n - 2: | |
| x.ctrl(q[n - 1 - (i + 1)], q[n - 1 - i]) | |
| else: | |
| x.ctrl(q[0:n - 1 - i], q[n - 1 - i]) | |
| def lshift(q: cudaq.qview, n: int): | |
| for i in range(n): | |
| if i == 0: | |
| x(q[0]) | |
| elif i == 1: | |
| x.ctrl(q[0], q[1]) | |
| else: | |
| x.ctrl(q[0:i], q[i]) | |
| def d2q5_tstep(q: cudaq.qview, nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]): | |
| qx = q[0:nqx] | |
| qy = q[nqx:nqx + nqy] | |
| qdir = q[nqx + nqy:nqx + nqy + nq_dir_val] | |
| idx_lqx = 2 | |
| b_list = dirs_i_val[idx_lqx * nq_dir_val:(idx_lqx + 1) * nq_dir_val] | |
| for j in range(nq_dir_val): | |
| if b_list[j] == 0: x(qdir[j]) | |
| cudaq.control(lshift, qdir, qx, nqx) | |
| for j in range(nq_dir_val): | |
| if b_list[j] == 0: x(qdir[j]) | |
| idx_rqx = 1 | |
| b_list = dirs_i_val[idx_rqx * nq_dir_val:(idx_rqx + 1) * nq_dir_val] | |
| for j in range(nq_dir_val): | |
| if b_list[j] == 0: x(qdir[j]) | |
| cudaq.control(rshift, qdir, qx, nqx) | |
| for j in range(nq_dir_val): | |
| if b_list[j] == 0: x(qdir[j]) | |
| idx_lqy = 4 | |
| b_list = dirs_i_val[idx_lqy * nq_dir_val:(idx_lqy + 1) * nq_dir_val] | |
| for j in range(nq_dir_val): | |
| if b_list[j] == 0: x(qdir[j]) | |
| cudaq.control(lshift, qdir, qy, nqy) | |
| for j in range(nq_dir_val): | |
| if b_list[j] == 0: x(qdir[j]) | |
| idx_rqy = 3 | |
| b_list = dirs_i_val[idx_rqy * nq_dir_val:(idx_rqy + 1) * nq_dir_val] | |
| for j in range(nq_dir_val): | |
| if b_list[j] == 0: x(qdir[j]) | |
| cudaq.control(rshift, qdir, qy, nqy) | |
| for j in range(nq_dir_val): | |
| if b_list[j] == 0: x(qdir[j]) | |
| def d2q5_tstep_wrapper(state_arg: cudaq.State, nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]): | |
| q = cudaq.qvector(state_arg) | |
| qdir = q[nqx + nqy:nqx + nqy + nq_dir_val] | |
| prep_op(qdir[2], qdir[1], qdir[0]) # Assumes nq_dir_val is always 3 | |
| d2q5_tstep(q, nqx, nqy, nq_dir_val, dirs_i_val) | |
| prep_op(qdir[2], qdir[1], qdir[0]) | |
| def d2q5_tstep_wrapper_hadamard(vec_arg: list[complex], nqx: int, nqy: int, nq_dir_val: int, dirs_i_val: list[int]): | |
| q = cudaq.qvector(vec_arg) | |
| qdir = q[nqx + nqy:nqx + nqy + nq_dir_val] | |
| qy = q[nqx:nqx + nqy] | |
| prep_op(qdir[2], qdir[1], qdir[0]) | |
| d2q5_tstep(q, nqx, nqy, nq_dir_val, dirs_i_val) | |
| prep_op(qdir[2], qdir[1], qdir[0]) | |
| for i in range(nqy): | |
| h(qy[i]) | |
| # MODIFIED run_timestep_func for memory optimization | |
| def run_timestep_func(vec_arg, hadamard=False): | |
| if hadamard: | |
| result = cudaq.get_state(d2q5_tstep_wrapper_hadamard, vec_arg, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list) | |
| else: | |
| result = cudaq.get_state(d2q5_tstep_wrapper, vec_arg, num_reg_qubits, num_reg_qubits, self.nq_dir, self.dirs_i_list) | |
| num_nonzero_ranks = num_ranks / (2**num_anc) | |
| rank_slice_cupy = to_cupy_array(result) # This is a cupy.ndarray view | |
| if rank >= num_nonzero_ranks and num_nonzero_ranks > 0: | |
| # This part of the original code implies multi-GPU/rank scenarios not fully active with num_ranks=1 | |
| # For num_ranks=1, rank is 0, so this block isn't hit if num_nonzero_ranks > 0. | |
| # If num_nonzero_ranks is effectively 0 (e.g. 1/(2^3) < 1, which it is), this condition for rank=0 is also false. | |
| # The original logic: if rank >= num_nonzero_ranks: ... | |
| # If num_ranks=1, num_anc=3 -> num_nonzero_ranks = 1/8. rank=0 is not >= 1/8. | |
| # This block seems intended for ranks that should contain no data. | |
| # For now, keeping original logic path, but the optimization is for the other 'if' block. | |
| sub_sv_zeros = np.zeros(N_sub_per_rank, dtype=np.complex128) | |
| cp.cuda.runtime.memcpy(rank_slice_cupy.data.ptr, sub_sv_zeros.ctypes.data, sub_sv_zeros.nbytes, cp.cuda.runtime.memcpyHostToDevice) | |
| # OPTIMIZED BLOCK: Modify directly on GPU if conditions are met | |
| if rank == 0 and num_nonzero_ranks < 1 and N_sub_per_rank > 0: | |
| # num_nonzero_ranks = 1/8 < 1, so this condition is met for rank 0 | |
| limit_idx = int(N_tot_state_vector / (2**num_anc)) | |
| if limit_idx < rank_slice_cupy.size: | |
| rank_slice_cupy[limit_idx:] = 0 # Modify directly on GPU using CuPy | |
| # No explicit memcpy back to GPU needed as rank_slice_cupy is the GPU array view. | |
| return result | |
| self.run_timestep = run_timestep_func | |
| # END OF MODIFIED run_timestep_func | |
| def write_state(self, state_to_write, t_step_str_val): | |
| rank_slice_cupy = to_cupy_array(state_to_write) | |
| num_nonzero_ranks = num_ranks / (2**num_anc) | |
| if rank < num_nonzero_ranks or (rank==0 and num_nonzero_ranks <=0): | |
| save_path = intermediate_folder_path / f"{t_step_str_val}_{rank}.npy" | |
| with open(save_path, 'wb') as f: | |
| arr_to_save = None | |
| data_limit = N_sub_per_rank | |
| if num_nonzero_ranks < 1 and rank == 0: # Only part of rank 0 has data | |
| data_limit = int(N_tot_state_vector / (2**num_anc)) | |
| if data_limit > 0: | |
| relevant_part_cupy = cp.real(rank_slice_cupy[:data_limit]) | |
| else: | |
| relevant_part_cupy = cp.array([], dtype=cp.float64) | |
| if relevant_part_cupy.size >= current_N * current_N: # Enough data for full grid | |
| arr_flat = relevant_part_cupy[:current_N * current_N] | |
| # CORRECTED CONDITION: Removed reference to downsampled_N | |
| if downsampling_factor > 1 and current_N > 0: | |
| arr_reshaped = arr_flat.reshape((current_N, current_N)) | |
| arr_downsampled = arr_reshaped[::downsampling_factor, ::downsampling_factor] | |
| arr_to_save = arr_downsampled.flatten() | |
| else: # No downsampling or conditions not met for it | |
| arr_to_save = arr_flat | |
| elif relevant_part_cupy.size > 0 : # Partial data | |
| if downsampling_factor > 1: | |
| arr_to_save = relevant_part_cupy[::downsampling_factor] | |
| else: | |
| arr_to_save = relevant_part_cupy | |
| if arr_to_save is not None and arr_to_save.size > 0: | |
| np.save(f, arr_to_save.get() if isinstance(arr_to_save, cp.ndarray) else arr_to_save) | |
| def run_evolution(self, initial_state_arg, total_timesteps, observable=False): | |
| current_state_val = initial_state_arg | |
| for t_iter in range(total_timesteps): | |
| next_state_val = None | |
| if t_iter == total_timesteps - 1 and observable: | |
| next_state_val = self.run_timestep(current_state_val, True) | |
| self.write_state(next_state_val, str(t_iter + 1) + "_h") | |
| else: | |
| next_state_val = self.run_timestep(current_state_val) | |
| if (t_iter + 1) % timesteps_per_frame == 0: | |
| self.write_state(next_state_val, str(t_iter + 1)) | |
| if rank == 0: | |
| print(f"Timestep: {t_iter + 1}/{total_timesteps}") | |
| cp.get_default_memory_pool().free_all_blocks() | |
| current_state_val = next_state_val | |
| last_saved_timestep_name = str(total_timesteps) if not observable else str(total_timesteps) + "_h" | |
| final_state_file_path = intermediate_folder_path / f"{last_saved_timestep_name}_{rank}.npy" | |
| final_T_state_file_path = intermediate_folder_path / f"{total_timesteps}_{rank}.npy" | |
| needs_final_save = True | |
| if observable and final_state_file_path.exists(): | |
| needs_final_save = False | |
| elif not observable and final_T_state_file_path.exists() and total_timesteps > 0 and total_timesteps % timesteps_per_frame == 0 : | |
| needs_final_save = False | |
| elif not observable and final_T_state_file_path.exists(): | |
| needs_final_save = False | |
| if needs_final_save and total_timesteps > 0 : | |
| if not observable: | |
| self.write_state(current_state_val, str(total_timesteps)) | |
| elif total_timesteps == 0 and current_state_val is not None: | |
| self.write_state(current_state_val, "0") | |
| if rank == 0: | |
| print(f"Timestep: {total_timesteps}/{total_timesteps} (Evolution complete)") | |
| cp.get_default_memory_pool().free_all_blocks() | |
| self.final_state = current_state_val | |
| # --- END OF QLBM CLASS --- | |
| downsampling_factor = 2**5 | |
| if current_N == 0: | |
| print("Error: current_N is zero. num_reg_qubits likely too small.") | |
| return None | |
| if current_N < downsampling_factor: | |
| downsampling_factor = current_N if current_N > 0 else 1 | |
| qlbm_obj = QLBMAdvecDiffD2Q5_new(ux=ux_input, uy=uy_input) | |
| initial_state_val = cudaq.get_state(alloc_kernel, num_qubits_total) | |
| xv_init = np.arange(current_N) | |
| yv_init = np.arange(current_N) | |
| initial_grid_2d_X, initial_grid_2d_Y = np.meshgrid(xv_init, yv_init) | |
| if distribution_type == "Random": | |
| initial_grid_2d = selected_initial_state_function_raw(current_N, current_N, current_N) # Pass N for shape | |
| else: | |
| initial_grid_2d = initial_state_func_eval(initial_grid_2d_X, initial_grid_2d_Y) | |
| sub_sv_init_flat = initial_grid_2d.flatten().astype(np.complex128) | |
| full_initial_sv_host = np.zeros(N_sub_per_rank, dtype=np.complex128) | |
| num_computational_states = current_N * current_N | |
| if len(sub_sv_init_flat) == num_computational_states: | |
| if num_computational_states <= N_sub_per_rank: | |
| full_initial_sv_host[:num_computational_states] = sub_sv_init_flat | |
| else: | |
| print(f"Error: Grid data {num_computational_states} > N_sub_per_rank {N_sub_per_rank}") | |
| return None | |
| else: | |
| print(f"Warning: Initial state size {len(sub_sv_init_flat)} != expected {num_computational_states}") | |
| fill_len = min(len(sub_sv_init_flat), num_computational_states, N_sub_per_rank) | |
| full_initial_sv_host[:fill_len] = sub_sv_init_flat[:fill_len] | |
| rank_slice_init = to_cupy_array(initial_state_val) | |
| print(f'Rank {rank}: Initializing state with {distribution_type} (ux={ux_input}, uy={uy_input})...') | |
| cp.cuda.runtime.memcpy(rank_slice_init.data.ptr, full_initial_sv_host.ctypes.data, full_initial_sv_host.nbytes, cp.cuda.runtime.memcpyHostToDevice) | |
| print(f'Rank {rank}: Initial state copied. Size: {len(sub_sv_init_flat)}. N_sub_per_rank: {N_sub_per_rank}') | |
| print("Starting QLBM evolution...") | |
| qlbm_obj.run_evolution(initial_state_val, T) | |
| print("QLBM evolution complete.") | |
| print("Generating animation with Plotly...") | |
| downsampled_N = current_N // downsampling_factor | |
| if downsampled_N == 0 and current_N > 0: | |
| downsampled_N = 1 | |
| elif current_N == 0: | |
| print("Error: current_N is zero before Plotly stage.") | |
| return None | |
| plotted_timesteps_str = [] | |
| if T == 0: | |
| if (intermediate_folder_path / f"0_{rank}.npy").exists(): | |
| plotted_timesteps_str.append("0") | |
| else: | |
| if timesteps_per_frame > 0: | |
| for t_step_val in range(timesteps_per_frame, T + 1, timesteps_per_frame): | |
| if intermediate_folder_path.joinpath(f"{t_step_val}_{rank}.npy").exists(): | |
| plotted_timesteps_str.append(str(t_step_val)) | |
| final_T_path = intermediate_folder_path.joinpath(f"{T}_{rank}.npy") | |
| if final_T_path.exists() and str(T) not in plotted_timesteps_str: | |
| plotted_timesteps_str.append(str(T)) | |
| plotted_timesteps_str = sorted(list(set(plotted_timesteps_str)), key=lambda k_str: int(k_str)) | |
| if not plotted_timesteps_str : | |
| print(f"Warning: No .npy files found for plotting T={T}. Animation may fail.") | |
| return None | |
| data_frames_list = [] | |
| actual_timesteps_for_title = [] | |
| for i_str_val in plotted_timesteps_str: | |
| try: | |
| file_path_load = intermediate_folder_path / f"{i_str_val}_{rank}.npy" | |
| sol_loaded = np.load(file_path_load) | |
| if sol_loaded.size == downsampled_N * downsampled_N: | |
| Z_data = np.reshape(sol_loaded, (downsampled_N, downsampled_N)) | |
| data_frames_list.append(Z_data) | |
| actual_timesteps_for_title.append(i_str_val) | |
| else: | |
| print(f"Warning: File {file_path_load} size {sol_loaded.size} != expected {downsampled_N*downsampled_N}. Skipping.") | |
| continue | |
| except FileNotFoundError: | |
| print(f"Warning: File {file_path_load} not found. Skipping.") | |
| continue | |
| except Exception as e_load: | |
| print(f"Error loading/reshaping {file_path_load}: {e_load}. Skipping.") | |
| continue | |
| if not data_frames_list: | |
| print("Error: No data frames loaded. Cannot create animation.") | |
| return None | |
| x_coords_plot = np.linspace(-10, 10, downsampled_N) | |
| y_coords_plot = np.linspace(-10, 10, downsampled_N) | |
| norm_factor_plotly = np.max(np.abs(data_frames_list[0])) if data_frames_list[0].size > 0 and np.max(np.abs(data_frames_list[0])) != 0 else 1.0 | |
| if norm_factor_plotly == 0: norm_factor_plotly = 1.0 | |
| all_data_max_val = norm_factor_plotly | |
| if len(data_frames_list) > 0: # Should always be true if we reached here | |
| max_vals_frames = [np.max(np.abs(d_frame)) for d_frame in data_frames_list if d_frame.size > 0] | |
| if max_vals_frames: | |
| global_max_val_frames = max(max_vals_frames) | |
| if global_max_val_frames > 0 : all_data_max_val = global_max_val_frames | |
| clim_upper_plotly = (all_data_max_val / norm_factor_plotly) * 1.0 | |
| if clim_upper_plotly == 0 : clim_upper_plotly = 0.6 | |
| results_base_dir = "Results" | |
| gif_frames_for_naming = int(simulation_fps * T) | |
| if gif_frames_for_naming == 0: gif_frames_for_naming = 1 | |
| dist_name_part = distribution_type.replace(' ','').replace('(Original)','').replace('(','').replace(')','') | |
| specific_folder_name = (f"d2q5_plotly_N{current_N}_T{T}_fr{gif_frames_for_naming}_" | |
| f"dist{dist_name_part}_ux{ux_input:.2f}_uy{uy_input:.2f}") | |
| final_output_dir = Path(results_base_dir) / specific_folder_name | |
| os.makedirs(final_output_dir, exist_ok=True) | |
| gif_path_to_return = final_output_dir / "animation_plotly.gif" | |
| Z_initial_placeholder = np.zeros((downsampled_N, downsampled_N)) | |
| fig = go.Figure(data=[go.Surface( | |
| z=Z_initial_placeholder, x=x_coords_plot, y=y_coords_plot, | |
| colorscale='Viridis', cmin=0, cmax=clim_upper_plotly, | |
| colorbar=dict( | |
| title=dict(text='Density', side='right', font=dict(color='white')), | |
| tickfont=dict(size=10, color='white'), | |
| bgcolor='rgba(0,0,0,0.4)', bordercolor='gray', outlinewidth=1, | |
| thickness=20, len=0.8, x=0.85, y=0.5 | |
| ))]) | |
| fig.update_layout( | |
| paper_bgcolor='black', plot_bgcolor='black', font=dict(color='white'), | |
| scene=dict( | |
| bgcolor='black', | |
| xaxis=dict(title=dict(text='X Axis', font=dict(color='white')), | |
| tickfont=dict(color='white'), | |
| gridcolor='rgba(128,128,128,0.3)', zerolinecolor='rgba(128,128,128,0.5)', | |
| linecolor='rgba(128,128,128,0.5)'), | |
| yaxis=dict(title=dict(text='Y Axis', font=dict(color='white')), | |
| tickfont=dict(color='white'), | |
| gridcolor='rgba(128,128,128,0.3)', zerolinecolor='rgba(128,128,128,0.5)', | |
| linecolor='rgba(128,128,128,0.5)'), | |
| zaxis=dict(title=dict(text='Density', font=dict(color='white')), | |
| tickfont=dict(color='white'), | |
| range=[0, clim_upper_plotly], gridcolor='rgba(128,128,128,0.3)', | |
| zerolinecolor='rgba(128,128,128,0.5)', linecolor='rgba(128,128,128,0.5)'), | |
| aspectratio=dict(x=1, y=1, z=0.7), | |
| camera=dict(eye=dict(x=1.5, y=1.5, z=1.0), center=dict(x=0, y=0, z=-0.1), up=dict(x=0,y=0,z=1)) | |
| ), | |
| margin=dict(l=10, r=10, b=10, t=60), width=800, height=700 | |
| ) | |
| temp_image_files = [] | |
| gif_fps_val = 10 | |
| try: | |
| for k, Z_frame_data in enumerate(data_frames_list): | |
| Z_plotly_frame = Z_frame_data / norm_factor_plotly | |
| fig.data[0].z = Z_plotly_frame | |
| current_ts_title = actual_timesteps_for_title[k] if k < len(actual_timesteps_for_title) else "Unknown" | |
| fig.update_layout( | |
| title_text=f'QLBM Simulation (Plotly) - Timestep: {current_ts_title}', | |
| title_x=0.5, title_font_size=16 | |
| ) | |
| temp_image_path = intermediate_folder_path / f"plotly_frame_{k:04d}.png" | |
| fig.write_image(str(temp_image_path), engine="kaleido") | |
| temp_image_files.append(str(temp_image_path)) | |
| if not temp_image_files: | |
| print("Error: No Plotly frames were successfully generated.") | |
| # Cleanup potentially created empty directory | |
| if not any(final_output_dir.iterdir()): # Check if directory is empty | |
| try: | |
| os.rmdir(final_output_dir) | |
| except OSError: | |
| pass # Ignore if it fails (e.g. not empty for other reasons) | |
| return None | |
| with imageio.get_writer(str(gif_path_to_return), mode='I', fps=gif_fps_val, loop=0) as writer: | |
| for filename in temp_image_files: | |
| image = imageio.imread(filename) | |
| writer.append_data(image) | |
| print(f"Plotly animation saved to {gif_path_to_return}") | |
| except Exception as e_plotly_anim: | |
| print(f"Error during Plotly animation/saving: {e_plotly_anim}") | |
| print("Ensure 'kaleido' and 'imageio' are installed ('pip install kaleido imageio').") | |
| return None | |
| return str(gif_path_to_return) | |
| # --- Gradio Interface Definition --- | |
| def qlbm_gradio_interface(num_reg_qubits_input: int, timescale_input: int, distribution_type_param: str, ux_param: float, uy_param: float): | |
| num_reg_qubits_val = int(num_reg_qubits_input) | |
| timescale_val = int(timescale_input) | |
| ux_val = float(ux_param) | |
| uy_val = float(uy_param) | |
| print(f"Gradio Interface: num_reg_qubits={num_reg_qubits_val}, T={timescale_val}, Distribution={distribution_type_param}, ux={ux_val}, uy={uy_val}") | |
| gif_path_output = simulate_qlbm_and_animate( | |
| num_reg_qubits=num_reg_qubits_val, | |
| T=timescale_val, | |
| distribution_type=distribution_type_param, | |
| ux_input=ux_val, | |
| uy_input=uy_val | |
| ) | |
| if gif_path_output is None: | |
| gr.Warning("Animation generation failed. Please check console for errors.") | |
| return None | |
| return gif_path_output | |
| with gr.Blocks(theme=gr.themes.Soft(), title="QLBM Simulation with Plotly") as qlbm_demo: | |
| gr.Markdown( | |
| """ | |
| # ⚛️ Quantum Lattice Boltzmann Method (QLBM) Simulator (Plotly Animation) | |
| Welcome to the Quantum Lattice Boltzmann Method (QLBM) simulator! This version uses Plotly for 3D animation. | |
| **How this Simulator Works:** | |
| This simulator implements a D2Q5 model on a quantum computer simulator (CUDA-Q). | |
| - Control grid size (via Number of Register Qubits: $N=2^{\text{num_reg_qubits}}$). | |
| - Set total simulation time (Timescale T). | |
| - Choose initial distribution. | |
| - Set advection velocities `ux` and `uy`. | |
| The simulation evolves this state, and a Plotly animation of the density is generated as a GIF. | |
| **Note:** Higher qubit counts and longer timescales are computationally intensive. Advection velocities should be small (e.g., < 0.3). | |
| The output GIF will loop indefinitely. Values for "Number of Register Qubits" above 9 may be unstable or very slow on shared/limited hardware. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("## Simulation Parameters") | |
| num_reg_qubits_slider = gr.Slider( | |
| minimum=2, maximum=10, value=8, step=1, # Max set to 10 | |
| label="Number of Register Qubits (num_reg_qubits)", | |
| info="Grid N = 2^num_reg_qubits. Max 10 (Note: >8 slow; >9 may hit simulator/memory limits on free tiers)." | |
| ) | |
| timescale_slider = gr.Slider( | |
| minimum=0, maximum=2000, value=100, step=10, | |
| label="Timescale (T)", info="Total number of timesteps. Max 2000." | |
| ) | |
| distribution_options = ["Sine Wave (Original)", "Gaussian", "Random"] | |
| distribution_type_input = gr.Radio( | |
| choices=distribution_options, value="Sine Wave (Original)", | |
| label="Initial Distribution Type", info="Select the initial pattern of the substance." | |
| ) | |
| ux_slider = gr.Slider( | |
| minimum=-0.4, maximum=0.4, value=0.2, step=0.01, | |
| label="Advection Velocity ux", info="x-component of background advection." | |
| ) | |
| uy_slider = gr.Slider( | |
| minimum=-0.4, maximum=0.4, value=0.15, step=0.01, | |
| label="Advection Velocity uy", info="y-component of background advection." | |
| ) | |
| run_qlbm_btn = gr.Button("Run QLBM Simulation (Plotly)", variant="primary") | |
| with gr.Column(scale=2): | |
| qlbm_plot_output = gr.Image(label="QLBM Simulation Animation (Plotly GIF)", type="filepath", height=600) | |
| qlbm_inputs_list = [num_reg_qubits_slider, timescale_slider, distribution_type_input, ux_slider, uy_slider] | |
| run_qlbm_btn.click( | |
| fn=qlbm_gradio_interface, | |
| inputs=qlbm_inputs_list, | |
| outputs=qlbm_plot_output | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| [8, 100, "Sine Wave (Original)", 0.2, 0.15], | |
| [6, 50, "Gaussian", 0.1, 0.05], | |
| [4, 30, "Random", -0.05, 0.1], | |
| ], | |
| inputs=qlbm_inputs_list, | |
| outputs=qlbm_plot_output, | |
| fn=qlbm_gradio_interface, | |
| cache_examples=False | |
| ) | |
| if __name__ == "__main__": | |
| try: | |
| cudaq.set_target('nvidia', option='fp64') | |
| print(f"CUDA-Q Target successfully set to: {cudaq.get_target().name}") | |
| except Exception as e_target: | |
| print(f"Warning: Could not set CUDA-Q target to 'nvidia'. Error: {e_target}") | |
| print(f"Current CUDA-Q Target: {cudaq.get_target().name}. Performance may be affected.") | |
| qlbm_demo.launch() |