h / pf_engine.py
rishab1090's picture
Create pf_engine.py
4cf972e verified
from __future__ import annotations
import numpy as np
from dataclasses import dataclass
from typing import Optional, Dict
GAMMA_W = 9.81 # kN/m^3 (unit weight of water)
@dataclass
class Priors:
N: int = 2000
c_mu: float = 20.0
c_sd: float = 5.0
phi_mu: float = 30.0
phi_sd: float = 5.0
gamma_mu: float = 19.0
gamma_sd: float = 1.0
z_mu: float = 5.0
z_sd: float = 1.0
ru_mu: float = 0.2
ru_sd: float = 0.05
@dataclass
class Targets:
pf_target: float = 0.05
def compute_slope_deg(dem: np.ndarray, pixel_size: Optional[float] = None) -> np.ndarray:
gy, gx = np.gradient(dem.astype(np.float32)) if pixel_size is None else np.gradient(dem.astype(np.float32), pixel_size)
slope_rad = np.arctan(np.hypot(gx, gy))
return np.degrees(slope_rad).astype(np.float32)
def pf_and_prescriptions(
slope_deg: np.ndarray,
priors: Priors = Priors(),
targets: Targets = Targets(),
mask: Optional[np.ndarray] = None,
seed: Optional[int] = 42,
):
assert slope_deg.ndim == 2, "slope_deg must be 2D"
rng = np.random.default_rng(seed)
th = np.radians(np.clip(slope_deg, 0, 89.9)).astype(np.float32)
cos_th = np.cos(th)[..., None]
sin_th = np.sin(th)[..., None]
N = priors.N
c = rng.normal(priors.c_mu, priors.c_sd, N) # kPa
phi = np.radians(rng.normal(priors.phi_mu, priors.phi_sd, N)) # rad
gamma = rng.normal(priors.gamma_mu, priors.gamma_sd, N) # kN/m^3
z = rng.normal(priors.z_mu, priors.z_sd, N) # m
ru = np.clip(rng.normal(priors.ru_mu, priors.ru_sd, N), 0, 1)
A = (gamma * z)[None, None, :]
Acos2 = A * (cos_th ** 2)
u = (ru * GAMMA_W * z)[None, None, :] * (cos_th ** 2)
numerator = c[None, None, :] + (Acos2 - u) * np.tan(phi)[None, None, :]
denominator = A * (sin_th * cos_th)
denominator = np.maximum(denominator, 1e-6)
FoS = numerator / denominator
Pf = (FoS < 1.0).mean(axis=-1).astype(np.float32)
# cohesion prescription
delta_c_per_sample = np.maximum((1.0 - FoS) * denominator, 0.0)
q_index = np.clip(int(np.ceil((1.0 - targets.pf_target) * N)) - 1, 0, N - 1)
delta_c_sorted = np.partition(delta_c_per_sample, q_index, axis=-1)
delta_c_needed = delta_c_sorted[..., q_index].astype(np.float32)
# friction prescription
Aeff = Acos2 - u
deltaN = np.maximum((1.0 - FoS) * denominator, 0.0)
tan_phi = np.tan(phi)[None, None, :]
with np.errstate(divide='ignore', invalid='ignore'):
needed_delta_tan = np.where(Aeff > 1e-6, deltaN / Aeff, np.inf)
target_tan = tan_phi + needed_delta_tan
target_phi = np.arctan(target_tan)
delta_phi_per_sample = np.degrees(np.maximum(target_phi - phi[None, None, :], 0.0)).astype(np.float32)
delta_phi_sorted = np.partition(delta_phi_per_sample, q_index, axis=-1)
delta_phi_needed = delta_phi_sorted[..., q_index].astype(np.float32)
if mask is not None:
Pf = np.where(mask, Pf, np.nan)
delta_c_needed = np.where(mask, delta_c_needed, np.nan)
delta_phi_needed = np.where(mask, delta_phi_needed, np.nan)
return {
"pf": Pf,
"delta_c_kpa": delta_c_needed,
"delta_phi_deg": delta_phi_needed,
}