geoforce / solver /coupled.py
Ubuntu
Coupled solver: implicit P->T with upwind advection + wells
464a35c
"""Sequential implicit P → T coupling with upwind advection.
Per time step:
1. Solve Darcy pressure with the well volumetric sources.
2. Compute face mass fluxes from the new pressure.
3. Assemble and solve the energy equation with conduction + upwind
advection + well heat.
Fluid properties (ρ, μ, cp) are held at reference-state constants — this
is the Boussinesq-style linearisation called out in HACKATHON-PLAN.md §7.
Extending to temperature-dependent ρ(T, P), μ(T) is straightforward but
out of scope for the 48-hour build.
"""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
from scipy.sparse import csr_matrix, lil_matrix
from scipy.sparse.linalg import spsolve
from .darcy import build_transmissibility, step_pressure
from .energy import build_thermal_conductance
from .grid import Grid
from .wells import WellSpec, apply_wells
@dataclass(frozen=True)
class CoupledState:
"""One step of the coupled solver."""
pressure: np.ndarray # (nx, ny), Pa
temperature: np.ndarray # (nx, ny), same unit as input t_old
mass_flux_x: np.ndarray # (nx-1, ny), kg/s, positive in +x
mass_flux_y: np.ndarray # (nx, ny-1), kg/s, positive in +y
def compute_mass_flux(
grid: Grid,
pressure: np.ndarray,
permeability: np.ndarray | float,
mu: float,
rho: float,
) -> tuple[np.ndarray, np.ndarray]:
"""Face-wise mass flow rate [kg/s] from the Darcy law.
Sign convention: flux is positive when flow goes from lower index to
higher index (i → i+1 for x, j → j+1 for y).
"""
Tx, Ty = build_transmissibility(grid, permeability, mu)
mx = rho * Tx * (pressure[:-1, :] - pressure[1:, :])
my = rho * Ty * (pressure[:, :-1] - pressure[:, 1:])
return mx, my
def assemble_energy_with_advection(
grid: Grid,
*,
thermal_conductivity: np.ndarray | float,
volumetric_heat_capacity: np.ndarray | float,
cp_water: float,
dt: float,
t_old: np.ndarray,
mass_flux_x: np.ndarray,
mass_flux_y: np.ndarray,
q_heat_inj: np.ndarray,
m_prod: np.ndarray,
dirichlet: dict[tuple[int, int], float] | None = None,
) -> tuple[csr_matrix, np.ndarray]:
"""Assemble (A, b) for the implicit conduction + upwind advection step."""
nx, ny = grid.shape
n = nx * ny
Cx, Cy = build_thermal_conductance(grid, thermal_conductivity)
rho_cp = np.broadcast_to(volumetric_heat_capacity, grid.shape).astype(np.float64)
storage = grid.cell_volume * rho_cp / dt
dirichlet = dirichlet or {}
A = lil_matrix((n, n), dtype=np.float64)
b = np.zeros(n, dtype=np.float64)
for i in range(nx):
for j in range(ny):
idx = grid.flat_index(i, j)
if (i, j) in dirichlet:
A[idx, idx] = 1.0
b[idx] = dirichlet[(i, j)]
continue
diag = storage[i, j]
# --- Conduction (5-point stencil) ---------------------------------
if i > 0:
c = Cx[i - 1, j]
A[idx, grid.flat_index(i - 1, j)] += -c
diag += c
if i < nx - 1:
c = Cx[i, j]
A[idx, grid.flat_index(i + 1, j)] += -c
diag += c
if j > 0:
c = Cy[i, j - 1]
A[idx, grid.flat_index(i, j - 1)] += -c
diag += c
if j < ny - 1:
c = Cy[i, j]
A[idx, grid.flat_index(i, j + 1)] += -c
diag += c
# --- Advection (upwind) -------------------------------------------
# west face (i-1, j) ↔ (i, j): flow m = mass_flux_x[i-1, j]
if i > 0:
m = mass_flux_x[i - 1, j]
if m >= 0.0:
# inflow from (i-1, j) at T_{i-1,j}
A[idx, grid.flat_index(i - 1, j)] += -m * cp_water
else:
# outflow to (i-1, j) at T_{i, j}
diag += -m * cp_water
# east face
if i < nx - 1:
m = mass_flux_x[i, j]
if m >= 0.0:
diag += m * cp_water
else:
A[idx, grid.flat_index(i + 1, j)] += m * cp_water
# south face
if j > 0:
m = mass_flux_y[i, j - 1]
if m >= 0.0:
A[idx, grid.flat_index(i, j - 1)] += -m * cp_water
else:
diag += -m * cp_water
# north face
if j < ny - 1:
m = mass_flux_y[i, j]
if m >= 0.0:
diag += m * cp_water
else:
A[idx, grid.flat_index(i, j + 1)] += m * cp_water
# --- Production (implicit sink at cell T) -------------------------
diag += m_prod[i, j] * cp_water
A[idx, idx] = diag
# Injection heat source already contains m_dot * cp_water * T_inj [W]
b[idx] = storage[i, j] * t_old[i, j] + q_heat_inj[i, j]
return A.tocsr(), b
def step_coupled(
grid: Grid,
*,
p_old: np.ndarray,
t_old: np.ndarray,
permeability: np.ndarray | float,
porosity: np.ndarray | float,
total_compressibility: float,
mu: float,
rho: float,
cp_water: float,
thermal_conductivity: np.ndarray | float,
volumetric_heat_capacity: np.ndarray | float,
dt: float,
wells: list[WellSpec] | None = None,
dirichlet_t: dict[tuple[int, int], float] | None = None,
) -> CoupledState:
"""Advance (P, T) by one coupled time step."""
wells = wells or []
q_vol, q_heat_inj, m_prod = apply_wells(
wells, grid, rho_ref=rho, cp_water=cp_water
)
pressure_result = step_pressure(
grid=grid,
p_old=p_old,
permeability=permeability,
porosity=porosity,
total_compressibility=total_compressibility,
mu=mu,
dt=dt,
q_vol=q_vol,
)
p_new = pressure_result.pressure
mx, my = compute_mass_flux(grid, p_new, permeability, mu, rho)
A, b = assemble_energy_with_advection(
grid=grid,
thermal_conductivity=thermal_conductivity,
volumetric_heat_capacity=volumetric_heat_capacity,
cp_water=cp_water,
dt=dt,
t_old=t_old,
mass_flux_x=mx,
mass_flux_y=my,
q_heat_inj=q_heat_inj,
m_prod=m_prod,
dirichlet=dirichlet_t,
)
t_new = spsolve(A, b).reshape(grid.shape)
return CoupledState(pressure=p_new, temperature=t_new, mass_flux_x=mx, mass_flux_y=my)
def run_coupled_transient(
grid: Grid,
*,
p_initial: np.ndarray,
t_initial: np.ndarray,
permeability: np.ndarray | float,
porosity: np.ndarray | float,
total_compressibility: float,
mu: float,
rho: float,
cp_water: float,
thermal_conductivity: np.ndarray | float,
volumetric_heat_capacity: np.ndarray | float,
dt: float,
n_steps: int,
wells: list[WellSpec] | None = None,
dirichlet_t: dict[tuple[int, int], float] | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Run n_steps of coupled P–T.
Returns ``(p_history, t_history)`` each of shape ``(n_steps + 1, nx, ny)``.
"""
nx, ny = grid.shape
p_hist = np.empty((n_steps + 1, nx, ny), dtype=np.float64)
t_hist = np.empty((n_steps + 1, nx, ny), dtype=np.float64)
p_hist[0] = p_initial
t_hist[0] = t_initial
p, t = p_initial.copy(), t_initial.copy()
for k in range(n_steps):
state = step_coupled(
grid=grid,
p_old=p,
t_old=t,
permeability=permeability,
porosity=porosity,
total_compressibility=total_compressibility,
mu=mu,
rho=rho,
cp_water=cp_water,
thermal_conductivity=thermal_conductivity,
volumetric_heat_capacity=volumetric_heat_capacity,
dt=dt,
wells=wells,
dirichlet_t=dirichlet_t,
)
p = state.pressure
t = state.temperature
p_hist[k + 1] = p
t_hist[k + 1] = t
return p_hist, t_hist