Spaces:
Sleeping
Sleeping
| """Python wrapper for Fortran NSWE solver""" | |
| import numpy as np | |
| import os | |
| import sys | |
| from pathlib import Path | |
| try: | |
| # Try to import compiled Fortran module | |
| from . import nswe_solver | |
| HAS_FORTRAN = True | |
| except ImportError: | |
| HAS_FORTRAN = False | |
| print("Warning: Fortran NSWE solver not compiled. Using Python fallback.") | |
| class NWSESolver: | |
| """Nonlinear Shallow-Water Equations Solver""" | |
| def __init__(self, grid_size=100, time_step=1.0): | |
| self.grid_size = grid_size | |
| self.time_step = time_step | |
| self.g = 9.81 | |
| self.rho = 1025.0 | |
| self.has_fortran = HAS_FORTRAN | |
| def solve(self, bathymetry, initial_wave, duration, dx=1000.0, dy=1000.0): | |
| """Solve NSWE for given bathymetry and initial wave | |
| Args: | |
| bathymetry: 2D array of water depths [m] | |
| initial_wave: 2D array of initial surface displacement [m] | |
| duration: simulation duration [s] | |
| dx, dy: grid spacing [m] | |
| Returns: | |
| eta, u, v: final state arrays | |
| """ | |
| nx, ny = bathymetry.shape | |
| nt = int(duration / self.time_step) | |
| if self.has_fortran: | |
| return self._solve_fortran(bathymetry, initial_wave, | |
| nx, ny, nt, dx, dy) | |
| else: | |
| return self._solve_python(bathymetry, initial_wave, | |
| nx, ny, nt, dx, dy) | |
| def _solve_fortran(self, H, eta0, nx, ny, nt, dx, dy): | |
| """Use compiled Fortran solver""" | |
| # Initialize arrays | |
| eta = eta0.copy() | |
| u = np.zeros((nx, ny)) | |
| v = np.zeros((nx, ny)) | |
| # Call Fortran subroutine (will be implemented when compiled) | |
| # nswe_solver.solve_nswe(eta, u, v, H, nx, ny, self.time_step, dx, dy, nt) | |
| return eta, u, v | |
| def _solve_python(self, H, eta0, nx, ny, nt, dx, dy): | |
| """Python fallback solver (slower, for testing)""" | |
| eta = eta0.copy() | |
| u = np.zeros((nx, ny)) | |
| v = np.zeros((nx, ny)) | |
| dt = self.time_step | |
| for t in range(nt): | |
| eta_new = eta.copy() | |
| u_new = u.copy() | |
| v_new = v.copy() | |
| # Continuity equation | |
| for i in range(1, nx-1): | |
| for j in range(1, ny-1): | |
| du_dx = (u[i+1,j] - u[i-1,j]) / (2*dx) | |
| dv_dy = (v[i,j+1] - v[i,j-1]) / (2*dy) | |
| eta_new[i,j] = eta[i,j] - dt * (H[i,j] + eta[i,j]) * (du_dx + dv_dy) | |
| # Momentum equations | |
| for i in range(1, nx-1): | |
| for j in range(1, ny-1): | |
| # u-momentum | |
| deta_dx = (eta_new[i+1,j] - eta_new[i-1,j]) / (2*dx) | |
| u_adv = u[i,j] * (u[i+1,j] - u[i-1,j]) / (2*dx) | |
| v_adv = v[i,j] * (u[i,j+1] - u[i,j-1]) / (2*dy) | |
| u_new[i,j] = u[i,j] - dt * (u_adv + v_adv + self.g * deta_dx) | |
| # v-momentum | |
| deta_dy = (eta_new[i,j+1] - eta_new[i,j-1]) / (2*dy) | |
| u_adv = u[i,j] * (v[i+1,j] - v[i-1,j]) / (2*dx) | |
| v_adv = v[i,j] * (v[i,j+1] - v[i,j-1]) / (2*dy) | |
| v_new[i,j] = v[i,j] - dt * (u_adv + v_adv + self.g * deta_dy) | |
| # Apply boundary conditions | |
| eta_new = self._apply_boundaries(eta_new) | |
| u_new = self._apply_boundaries(u_new) | |
| v_new = self._apply_boundaries(v_new) | |
| eta, u, v = eta_new, u_new, v_new | |
| return eta, u, v | |
| def _apply_boundaries(self, arr): | |
| """Apply zero-gradient boundary conditions""" | |
| # Left/right boundaries | |
| arr[0,:] = arr[1,:] | |
| arr[-1,:] = arr[-2,:] | |
| # Top/bottom boundaries | |
| arr[:,0] = arr[:,1] | |
| arr[:,-1] = arr[:,-2] | |
| return arr | |
| def compute_wave_celerity(self, H, eta): | |
| """Compute wave celerity from solver state""" | |
| c0 = np.sqrt(self.g * H) | |
| return c0 | |