tsunami / src /tsuwave /core /nswe_wrapper.py
Gitdeeper4's picture
رفع جميع ملفات TSU-WAVE مع YAML
12834b7
"""Python wrapper for Fortran NSWE solver"""
import numpy as np
import os
import sys
from pathlib import Path
try:
# Try to import compiled Fortran module
from . import nswe_solver
HAS_FORTRAN = True
except ImportError:
HAS_FORTRAN = False
print("Warning: Fortran NSWE solver not compiled. Using Python fallback.")
class NWSESolver:
"""Nonlinear Shallow-Water Equations Solver"""
def __init__(self, grid_size=100, time_step=1.0):
self.grid_size = grid_size
self.time_step = time_step
self.g = 9.81
self.rho = 1025.0
self.has_fortran = HAS_FORTRAN
def solve(self, bathymetry, initial_wave, duration, dx=1000.0, dy=1000.0):
"""Solve NSWE for given bathymetry and initial wave
Args:
bathymetry: 2D array of water depths [m]
initial_wave: 2D array of initial surface displacement [m]
duration: simulation duration [s]
dx, dy: grid spacing [m]
Returns:
eta, u, v: final state arrays
"""
nx, ny = bathymetry.shape
nt = int(duration / self.time_step)
if self.has_fortran:
return self._solve_fortran(bathymetry, initial_wave,
nx, ny, nt, dx, dy)
else:
return self._solve_python(bathymetry, initial_wave,
nx, ny, nt, dx, dy)
def _solve_fortran(self, H, eta0, nx, ny, nt, dx, dy):
"""Use compiled Fortran solver"""
# Initialize arrays
eta = eta0.copy()
u = np.zeros((nx, ny))
v = np.zeros((nx, ny))
# Call Fortran subroutine (will be implemented when compiled)
# nswe_solver.solve_nswe(eta, u, v, H, nx, ny, self.time_step, dx, dy, nt)
return eta, u, v
def _solve_python(self, H, eta0, nx, ny, nt, dx, dy):
"""Python fallback solver (slower, for testing)"""
eta = eta0.copy()
u = np.zeros((nx, ny))
v = np.zeros((nx, ny))
dt = self.time_step
for t in range(nt):
eta_new = eta.copy()
u_new = u.copy()
v_new = v.copy()
# Continuity equation
for i in range(1, nx-1):
for j in range(1, ny-1):
du_dx = (u[i+1,j] - u[i-1,j]) / (2*dx)
dv_dy = (v[i,j+1] - v[i,j-1]) / (2*dy)
eta_new[i,j] = eta[i,j] - dt * (H[i,j] + eta[i,j]) * (du_dx + dv_dy)
# Momentum equations
for i in range(1, nx-1):
for j in range(1, ny-1):
# u-momentum
deta_dx = (eta_new[i+1,j] - eta_new[i-1,j]) / (2*dx)
u_adv = u[i,j] * (u[i+1,j] - u[i-1,j]) / (2*dx)
v_adv = v[i,j] * (u[i,j+1] - u[i,j-1]) / (2*dy)
u_new[i,j] = u[i,j] - dt * (u_adv + v_adv + self.g * deta_dx)
# v-momentum
deta_dy = (eta_new[i,j+1] - eta_new[i,j-1]) / (2*dy)
u_adv = u[i,j] * (v[i+1,j] - v[i-1,j]) / (2*dx)
v_adv = v[i,j] * (v[i,j+1] - v[i,j-1]) / (2*dy)
v_new[i,j] = v[i,j] - dt * (u_adv + v_adv + self.g * deta_dy)
# Apply boundary conditions
eta_new = self._apply_boundaries(eta_new)
u_new = self._apply_boundaries(u_new)
v_new = self._apply_boundaries(v_new)
eta, u, v = eta_new, u_new, v_new
return eta, u, v
def _apply_boundaries(self, arr):
"""Apply zero-gradient boundary conditions"""
# Left/right boundaries
arr[0,:] = arr[1,:]
arr[-1,:] = arr[-2,:]
# Top/bottom boundaries
arr[:,0] = arr[:,1]
arr[:,-1] = arr[:,-2]
return arr
def compute_wave_celerity(self, H, eta):
"""Compute wave celerity from solver state"""
c0 = np.sqrt(self.g * H)
return c0