| import numpy as np |
| import torch |
|
|
| def solver(u0_batch, t_coordinate, nu): |
| """Solves the Burgers' equation for all times in t_coordinate using a spectral method. |
| |
| Args: |
| u0_batch (np.ndarray): Initial condition [batch_size, N], |
| where batch_size is the number of different initial conditions, |
| and N is the number of spatial grid points. |
| t_coordinate (np.ndarray): Time coordinates of shape [T+1]. |
| It begins with t_0=0 and follows the time steps t_1, ..., t_T. |
| nu (float): Viscosity coefficient. |
| |
| Returns: |
| solutions (np.ndarray): Shape [batch_size, T+1, N]. |
| solutions[:, 0, :] contains the initial conditions (u0_batch), |
| solutions[:, i, :] contains the solutions at time t_coordinate[i]. |
| """ |
| |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| print(f"Using device: {device}") |
| |
| |
| batch_size, N = u0_batch.shape |
| u0_tensor = torch.tensor(u0_batch, dtype=torch.float32, device=device) |
| |
| |
| dx = 1.0 / N |
| |
| |
| |
| k = 2 * np.pi * torch.fft.rfftfreq(N, dx).to(device) |
| |
| |
| M = N // 2 + 1 |
| |
| |
| |
| dealias_mask = torch.ones(M, dtype=torch.bool, device=device) |
| cutoff_idx = int(2 * M // 3) |
| dealias_mask[cutoff_idx:] = False |
| |
| |
| T_steps = len(t_coordinate) - 1 |
| solutions_tensor = torch.zeros((batch_size, T_steps + 1, N), dtype=torch.float32, device=device) |
| solutions_tensor[:, 0, :] = u0_tensor |
| |
| |
| |
| |
| max_u = torch.max(torch.abs(u0_tensor)).item() |
| C_cfl = 0.5 |
| dt_cfl = C_cfl * dx / max(max_u, 1e-10) |
| |
| |
| C_diff = 0.2 |
| dt_diff = C_diff * dx**2 / (2 * nu) if nu > 0 else float('inf') |
| |
| |
| dt_internal = min(dt_cfl, dt_diff, 1e-3) |
| |
| print(f"Grid resolution (N): {N}") |
| print(f"Viscosity coefficient (nu): {nu}") |
| print(f"Maximum wave speed: {max_u:.6f}") |
| print(f"Internal time step: {dt_internal:.6f} (CFL: {dt_cfl:.6f}, Diffusion: {dt_diff:.6f})") |
| |
| |
| def advance_timestep(u_hat, dt): |
| """Advance the solution one time step using a semi-implicit scheme with integrating factor. |
| |
| Args: |
| u_hat: Current solution in Fourier space [batch_size, M] |
| dt: Time step |
| |
| Returns: |
| Updated solution in Fourier space |
| """ |
| |
| u = torch.fft.irfft(u_hat, n=N, dim=1) |
| |
| |
| nonlinear = 0.5 * u**2 |
| nonlinear_hat = torch.fft.rfft(nonlinear, n=N, dim=1) |
| |
| |
| nonlinear_hat = nonlinear_hat * dealias_mask.unsqueeze(0) |
| |
| |
| nonlinear_contribution = -1j * k.unsqueeze(0) * nonlinear_hat |
| |
| |
| |
| integrating_factor = torch.exp(-nu * k.unsqueeze(0)**2 * dt) |
| |
| |
| |
| u_hat_new = integrating_factor * (u_hat + dt * nonlinear_contribution) |
| |
| return u_hat_new |
| |
| |
| u_hat = torch.fft.rfft(u0_tensor, n=N, dim=1) |
| |
| |
| t_current = 0.0 |
| t_idx = 1 |
| |
| while t_idx <= T_steps: |
| t_target = t_coordinate[t_idx] |
| steps_taken = 0 |
| |
| |
| while t_current < t_target: |
| |
| dt = min(dt_internal, t_target - t_current) |
| |
| |
| u_hat = advance_timestep(u_hat, dt) |
| |
| t_current += dt |
| steps_taken += 1 |
| |
| |
| |
| if steps_taken % 50 == 0: |
| u = torch.fft.irfft(u_hat, n=N, dim=1) |
| new_max_u = torch.max(torch.abs(u)).item() |
| |
| |
| new_dt_cfl = C_cfl * dx / max(new_max_u, 1e-10) |
| dt_internal = min(new_dt_cfl, dt_diff, 1e-3) |
| |
| |
| u = torch.fft.irfft(u_hat, n=N, dim=1) |
| solutions_tensor[:, t_idx, :] = u |
| |
| |
| if t_idx % max(1, T_steps // 10) == 0 or t_idx == T_steps: |
| print(f"Completed time step {t_idx}/{T_steps}, t = {t_target:.4f} ({steps_taken} internal steps)") |
| |
| current_max_u = torch.max(torch.abs(u)).item() |
| print(f" Current max|u|: {current_max_u:.6f}") |
| |
| t_idx += 1 |
| |
| |
| solutions = solutions_tensor.cpu().numpy() |
| |
| return solutions |