Tadiwa-M
Deploy: auto-derive dedup radius (drop hardcoded 3m bypass)
58aefd4
Raw
History Blame Contribute Delete
11.7 kB
"""Line-transect distance sampling — detection-function fit + density estimate.
A faithful, dependency-light implementation of conventional distance sampling
(Buckland, Anderson, Burnham, Laake, Borchers & Thomas, *Introduction to
Distance Sampling*, 2001). It estimates animal **density** (and, given an area,
**abundance**) from the perpendicular distances of detected objects to a survey
line, correcting for the fact that detection probability falls off with distance.
Why this matters for Prometheus: a detector counts what it *sees*; distance
sampling estimates what is actually *there*, with confidence intervals — the
number an ecologist can put in a management report.
Model (line transect, one detection function over [0, w]):
detected-distance pdf f(y) = g(y) / mu, mu = integral_0^w g(y) dy
effective strip width ESW = mu (per side; g(0) = 1)
density D = n / (2 * L * mu)
abundance N = D * A
avg detection prob. Pa = mu / w
encounter rate n / L
`mu` is the *effective strip half-width* (ESW): the half-width of a hypothetical
strip within which all objects would be detected to give the same expected
count. Detection functions supported: half-normal and hazard-rate, selected by
AIC. Uncertainty is via a nonparametric bootstrap (encounter-rate variance from
a Poisson resample of n, detection-function variance from resampling distances),
which is robust and easy to validate against a known synthetic truth.
Only numpy + scipy are required; nothing here imports torch/ultralytics.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from typing import Literal
import numpy as np
from scipy.integrate import quad
from scipy.optimize import minimize
Model = Literal["half-normal", "hazard-rate"]
# ---------------------------------------------------------------------------
# Detection functions
# ---------------------------------------------------------------------------
class DetectionFunction:
"""A fitted detection function g(y) on [0, w] with g(0) = 1.
Fit by maximum likelihood on the *detected* perpendicular distances. The
likelihood is the product of f(y_i) = g(y_i) / mu, i.e. the pdf of distances
conditional on detection within the truncation width w.
"""
def __init__(self, model: Model, params: np.ndarray, w: float):
self.model = model
self.params = np.asarray(params, dtype=float)
self.w = float(w)
self._n_params = len(self.params)
# -- shape --------------------------------------------------------------
def g(self, y: np.ndarray | float) -> np.ndarray:
y = np.asarray(y, dtype=float)
if self.model == "half-normal":
(sigma,) = self.params
return np.exp(-(y**2) / (2.0 * sigma**2))
# hazard-rate: g(y) = 1 - exp(-(y/sigma)^(-b))
sigma, b = self.params
with np.errstate(divide="ignore", over="ignore", invalid="ignore"):
ratio = np.where(y > 0, y / sigma, np.inf)
out = 1.0 - np.exp(-np.power(ratio, -b))
# at y = 0 the hazard-rate detection prob is exactly 1
return np.where(y <= 0, 1.0, out)
# -- effective strip (half-)width: mu = integral_0^w g(y) dy ------------
def esw(self) -> float:
if self.model == "half-normal":
(sigma,) = self.params
return float(sigma * math.sqrt(math.pi / 2.0) * math.erf(self.w / (sigma * math.sqrt(2.0))))
val, _ = quad(lambda y: float(self.g(y)), 0.0, self.w, limit=100)
return float(val)
def p_detect(self) -> float:
"""Average detection probability over [0, w] = ESW / w."""
return self.esw() / self.w
# -- likelihood / AIC ---------------------------------------------------
def neg_log_likelihood(self, distances: np.ndarray) -> float:
mu = self.esw()
if mu <= 0 or not np.isfinite(mu):
return np.inf
g = self.g(distances)
if np.any(g <= 0) or not np.all(np.isfinite(g)):
return np.inf
# log f(y_i) = log g(y_i) - log mu
return float(-np.sum(np.log(g)) + len(distances) * math.log(mu))
def aic(self, distances: np.ndarray) -> float:
return 2.0 * self._n_params + 2.0 * self.neg_log_likelihood(distances)
def _fit_one(model: Model, distances: np.ndarray, w: float) -> DetectionFunction:
"""MLE fit of a single detection function to truncated distances."""
d = np.asarray(distances, dtype=float)
d = d[(d >= 0) & (d <= w)]
if len(d) < 2:
raise ValueError("Need at least 2 in-truncation distances to fit a detection function.")
scale0 = max(np.std(d), w / 4.0, 1e-6)
def nll(theta):
df = DetectionFunction(model, np.exp(theta), w) # optimise in log-space (params > 0)
return df.neg_log_likelihood(d)
if model == "half-normal":
x0 = np.array([math.log(scale0)])
else: # hazard-rate: (sigma, b), start b ~ 2
x0 = np.array([math.log(scale0), math.log(2.0)])
res = minimize(nll, x0, method="Nelder-Mead",
options={"xatol": 1e-6, "fatol": 1e-6, "maxiter": 2000})
return DetectionFunction(model, np.exp(res.x), w)
# ---------------------------------------------------------------------------
# Result container
# ---------------------------------------------------------------------------
@dataclass
class DistanceSamplingResult:
model: Model
n: int # detections used (within truncation w)
transect_length: float # total L (same units as distances)
truncation_w: float
area: float | None # survey region area, for abundance (or None)
density: float # animals per unit area
density_ci: tuple[float, float]
abundance: float | None
abundance_ci: tuple[float, float] | None
esw: float # effective strip half-width (mu)
p_detect: float # average detection prob over [0, w]
encounter_rate: float # n / L
sigma: float # primary scale parameter
aic: float
cv_density: float # coefficient of variation of D
params: np.ndarray = field(default_factory=lambda: np.array([]))
def summary(self) -> str:
lo, hi = self.density_ci
lines = [
f"Distance sampling — {self.model} detection function",
f" n detections (within w={self.truncation_w:g}): {self.n}",
f" effective strip half-width (ESW): {self.esw:.4g}",
f" avg detection probability: {self.p_detect:.3f}",
f" encounter rate (n/L): {self.encounter_rate:.4g}",
f" DENSITY: {self.density:.4g} (95% CI {lo:.4g}{hi:.4g}, CV {self.cv_density:.1%})",
]
if self.abundance is not None and self.abundance_ci is not None:
alo, ahi = self.abundance_ci
lines.append(f" ABUNDANCE: {self.abundance:.4g} (95% CI {alo:.4g}{ahi:.4g})")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Top-level estimator
# ---------------------------------------------------------------------------
def estimate_density(
distances,
transect_length: float,
truncation: float | None = None,
area: float | None = None,
model: Model | Literal["auto"] = "auto",
two_sided: bool = True,
n_boot: int = 1000,
ci: float = 0.95,
seed: int | None = 0,
) -> DistanceSamplingResult:
"""Estimate density (and abundance) from perpendicular detection distances.
Args:
distances: perpendicular distances of detected objects to the transect
line (same length unit throughout).
transect_length: total length L of the survey line(s).
truncation: right-truncation width w. Detections beyond w are discarded
(standard practice — the far tail is noisy). Defaults to the largest
observed distance.
area: survey region area A, to also return abundance N = D * A. Optional.
model: "half-normal", "hazard-rate", or "auto" (pick lower AIC).
two_sided: True for a line transect detecting on both sides (effective
covered area = 2 * ESW * L); False for one-sided strip.
n_boot: bootstrap iterations for the confidence interval.
ci: confidence level (0.95 -> 95% interval).
seed: RNG seed for reproducible bootstrap (None for nondeterministic).
Returns:
DistanceSamplingResult with density, CI, ESW, detection probability,
encounter rate, and (if area given) abundance.
"""
d_all = np.asarray(distances, dtype=float)
d_all = d_all[np.isfinite(d_all) & (d_all >= 0)]
if len(d_all) < 2:
raise ValueError("Need at least 2 non-negative distances.")
w = float(truncation) if truncation is not None else float(d_all.max())
d = d_all[d_all <= w]
if len(d) < 2:
raise ValueError("Need at least 2 distances within the truncation width.")
side = 2.0 if two_sided else 1.0
def fit_and_density(dist: np.ndarray, n: int, chosen: Model | None):
if model == "auto" and chosen is None:
cands = []
for m in ("half-normal", "hazard-rate"):
try:
df_m = _fit_one(m, dist, w)
cands.append((df_m.aic(dist), m, df_m))
except Exception: # noqa: BLE001 — a failed candidate just drops out
continue
if not cands:
raise RuntimeError("No detection function could be fit.")
_, m_best, df_best = min(cands, key=lambda t: t[0])
else:
m_best = chosen or (model if model != "auto" else "half-normal")
df_best = _fit_one(m_best, dist, w)
mu = df_best.esw()
D = n / (side * transect_length * mu)
return D, df_best, m_best, mu
# Point estimate on the observed data
D_hat, df, m_best, mu_hat = fit_and_density(d, len(d), None)
# Bootstrap CI: Poisson-resample n (encounter-rate variance) and resample
# distances with replacement (detection-function variance), refit each time.
rng = np.random.default_rng(seed)
boot = np.empty(n_boot, dtype=float)
n_obs = len(d)
for i in range(n_boot):
n_star = rng.poisson(n_obs)
if n_star < 2:
boot[i] = np.nan
continue
d_star = rng.choice(d, size=n_star, replace=True)
try:
D_star, *_ = fit_and_density(d_star, n_star, m_best)
boot[i] = D_star
except Exception: # noqa: BLE001
boot[i] = np.nan
boot = boot[np.isfinite(boot)]
alpha = 1.0 - ci
lo, hi = np.quantile(boot, [alpha / 2.0, 1.0 - alpha / 2.0])
cv = float(np.std(boot, ddof=1) / D_hat) if D_hat > 0 else float("nan")
N = D_hat * area if area is not None else None
N_ci = (lo * area, hi * area) if area is not None else None
return DistanceSamplingResult(
model=m_best,
n=n_obs,
transect_length=float(transect_length),
truncation_w=w,
area=area,
density=float(D_hat),
density_ci=(float(lo), float(hi)),
abundance=(float(N) if N is not None else None),
abundance_ci=((float(N_ci[0]), float(N_ci[1])) if N_ci is not None else None),
esw=float(mu_hat),
p_detect=float(mu_hat / w),
encounter_rate=float(n_obs / transect_length),
sigma=float(df.params[0]),
aic=float(df.aic(d)),
cv_density=cv,
params=df.params,
)