"""Line-transect distance sampling — detection-function fit + density estimate. A faithful, dependency-light implementation of conventional distance sampling (Buckland, Anderson, Burnham, Laake, Borchers & Thomas, *Introduction to Distance Sampling*, 2001). It estimates animal **density** (and, given an area, **abundance**) from the perpendicular distances of detected objects to a survey line, correcting for the fact that detection probability falls off with distance. Why this matters for Prometheus: a detector counts what it *sees*; distance sampling estimates what is actually *there*, with confidence intervals — the number an ecologist can put in a management report. Model (line transect, one detection function over [0, w]): detected-distance pdf f(y) = g(y) / mu, mu = integral_0^w g(y) dy effective strip width ESW = mu (per side; g(0) = 1) density D = n / (2 * L * mu) abundance N = D * A avg detection prob. Pa = mu / w encounter rate n / L `mu` is the *effective strip half-width* (ESW): the half-width of a hypothetical strip within which all objects would be detected to give the same expected count. Detection functions supported: half-normal and hazard-rate, selected by AIC. Uncertainty is via a nonparametric bootstrap (encounter-rate variance from a Poisson resample of n, detection-function variance from resampling distances), which is robust and easy to validate against a known synthetic truth. Only numpy + scipy are required; nothing here imports torch/ultralytics. """ from __future__ import annotations import math from dataclasses import dataclass, field from typing import Literal import numpy as np from scipy.integrate import quad from scipy.optimize import minimize Model = Literal["half-normal", "hazard-rate"] # --------------------------------------------------------------------------- # Detection functions # --------------------------------------------------------------------------- class DetectionFunction: """A fitted detection function g(y) on [0, w] with g(0) = 1. Fit by maximum likelihood on the *detected* perpendicular distances. The likelihood is the product of f(y_i) = g(y_i) / mu, i.e. the pdf of distances conditional on detection within the truncation width w. """ def __init__(self, model: Model, params: np.ndarray, w: float): self.model = model self.params = np.asarray(params, dtype=float) self.w = float(w) self._n_params = len(self.params) # -- shape -------------------------------------------------------------- def g(self, y: np.ndarray | float) -> np.ndarray: y = np.asarray(y, dtype=float) if self.model == "half-normal": (sigma,) = self.params return np.exp(-(y**2) / (2.0 * sigma**2)) # hazard-rate: g(y) = 1 - exp(-(y/sigma)^(-b)) sigma, b = self.params with np.errstate(divide="ignore", over="ignore", invalid="ignore"): ratio = np.where(y > 0, y / sigma, np.inf) out = 1.0 - np.exp(-np.power(ratio, -b)) # at y = 0 the hazard-rate detection prob is exactly 1 return np.where(y <= 0, 1.0, out) # -- effective strip (half-)width: mu = integral_0^w g(y) dy ------------ def esw(self) -> float: if self.model == "half-normal": (sigma,) = self.params return float(sigma * math.sqrt(math.pi / 2.0) * math.erf(self.w / (sigma * math.sqrt(2.0)))) val, _ = quad(lambda y: float(self.g(y)), 0.0, self.w, limit=100) return float(val) def p_detect(self) -> float: """Average detection probability over [0, w] = ESW / w.""" return self.esw() / self.w # -- likelihood / AIC --------------------------------------------------- def neg_log_likelihood(self, distances: np.ndarray) -> float: mu = self.esw() if mu <= 0 or not np.isfinite(mu): return np.inf g = self.g(distances) if np.any(g <= 0) or not np.all(np.isfinite(g)): return np.inf # log f(y_i) = log g(y_i) - log mu return float(-np.sum(np.log(g)) + len(distances) * math.log(mu)) def aic(self, distances: np.ndarray) -> float: return 2.0 * self._n_params + 2.0 * self.neg_log_likelihood(distances) def _fit_one(model: Model, distances: np.ndarray, w: float) -> DetectionFunction: """MLE fit of a single detection function to truncated distances.""" d = np.asarray(distances, dtype=float) d = d[(d >= 0) & (d <= w)] if len(d) < 2: raise ValueError("Need at least 2 in-truncation distances to fit a detection function.") scale0 = max(np.std(d), w / 4.0, 1e-6) def nll(theta): df = DetectionFunction(model, np.exp(theta), w) # optimise in log-space (params > 0) return df.neg_log_likelihood(d) if model == "half-normal": x0 = np.array([math.log(scale0)]) else: # hazard-rate: (sigma, b), start b ~ 2 x0 = np.array([math.log(scale0), math.log(2.0)]) res = minimize(nll, x0, method="Nelder-Mead", options={"xatol": 1e-6, "fatol": 1e-6, "maxiter": 2000}) return DetectionFunction(model, np.exp(res.x), w) # --------------------------------------------------------------------------- # Result container # --------------------------------------------------------------------------- @dataclass class DistanceSamplingResult: model: Model n: int # detections used (within truncation w) transect_length: float # total L (same units as distances) truncation_w: float area: float | None # survey region area, for abundance (or None) density: float # animals per unit area density_ci: tuple[float, float] abundance: float | None abundance_ci: tuple[float, float] | None esw: float # effective strip half-width (mu) p_detect: float # average detection prob over [0, w] encounter_rate: float # n / L sigma: float # primary scale parameter aic: float cv_density: float # coefficient of variation of D params: np.ndarray = field(default_factory=lambda: np.array([])) def summary(self) -> str: lo, hi = self.density_ci lines = [ f"Distance sampling — {self.model} detection function", f" n detections (within w={self.truncation_w:g}): {self.n}", f" effective strip half-width (ESW): {self.esw:.4g}", f" avg detection probability: {self.p_detect:.3f}", f" encounter rate (n/L): {self.encounter_rate:.4g}", f" DENSITY: {self.density:.4g} (95% CI {lo:.4g}–{hi:.4g}, CV {self.cv_density:.1%})", ] if self.abundance is not None and self.abundance_ci is not None: alo, ahi = self.abundance_ci lines.append(f" ABUNDANCE: {self.abundance:.4g} (95% CI {alo:.4g}–{ahi:.4g})") return "\n".join(lines) # --------------------------------------------------------------------------- # Top-level estimator # --------------------------------------------------------------------------- def estimate_density( distances, transect_length: float, truncation: float | None = None, area: float | None = None, model: Model | Literal["auto"] = "auto", two_sided: bool = True, n_boot: int = 1000, ci: float = 0.95, seed: int | None = 0, ) -> DistanceSamplingResult: """Estimate density (and abundance) from perpendicular detection distances. Args: distances: perpendicular distances of detected objects to the transect line (same length unit throughout). transect_length: total length L of the survey line(s). truncation: right-truncation width w. Detections beyond w are discarded (standard practice — the far tail is noisy). Defaults to the largest observed distance. area: survey region area A, to also return abundance N = D * A. Optional. model: "half-normal", "hazard-rate", or "auto" (pick lower AIC). two_sided: True for a line transect detecting on both sides (effective covered area = 2 * ESW * L); False for one-sided strip. n_boot: bootstrap iterations for the confidence interval. ci: confidence level (0.95 -> 95% interval). seed: RNG seed for reproducible bootstrap (None for nondeterministic). Returns: DistanceSamplingResult with density, CI, ESW, detection probability, encounter rate, and (if area given) abundance. """ d_all = np.asarray(distances, dtype=float) d_all = d_all[np.isfinite(d_all) & (d_all >= 0)] if len(d_all) < 2: raise ValueError("Need at least 2 non-negative distances.") w = float(truncation) if truncation is not None else float(d_all.max()) d = d_all[d_all <= w] if len(d) < 2: raise ValueError("Need at least 2 distances within the truncation width.") side = 2.0 if two_sided else 1.0 def fit_and_density(dist: np.ndarray, n: int, chosen: Model | None): if model == "auto" and chosen is None: cands = [] for m in ("half-normal", "hazard-rate"): try: df_m = _fit_one(m, dist, w) cands.append((df_m.aic(dist), m, df_m)) except Exception: # noqa: BLE001 — a failed candidate just drops out continue if not cands: raise RuntimeError("No detection function could be fit.") _, m_best, df_best = min(cands, key=lambda t: t[0]) else: m_best = chosen or (model if model != "auto" else "half-normal") df_best = _fit_one(m_best, dist, w) mu = df_best.esw() D = n / (side * transect_length * mu) return D, df_best, m_best, mu # Point estimate on the observed data D_hat, df, m_best, mu_hat = fit_and_density(d, len(d), None) # Bootstrap CI: Poisson-resample n (encounter-rate variance) and resample # distances with replacement (detection-function variance), refit each time. rng = np.random.default_rng(seed) boot = np.empty(n_boot, dtype=float) n_obs = len(d) for i in range(n_boot): n_star = rng.poisson(n_obs) if n_star < 2: boot[i] = np.nan continue d_star = rng.choice(d, size=n_star, replace=True) try: D_star, *_ = fit_and_density(d_star, n_star, m_best) boot[i] = D_star except Exception: # noqa: BLE001 boot[i] = np.nan boot = boot[np.isfinite(boot)] alpha = 1.0 - ci lo, hi = np.quantile(boot, [alpha / 2.0, 1.0 - alpha / 2.0]) cv = float(np.std(boot, ddof=1) / D_hat) if D_hat > 0 else float("nan") N = D_hat * area if area is not None else None N_ci = (lo * area, hi * area) if area is not None else None return DistanceSamplingResult( model=m_best, n=n_obs, transect_length=float(transect_length), truncation_w=w, area=area, density=float(D_hat), density_ci=(float(lo), float(hi)), abundance=(float(N) if N is not None else None), abundance_ci=((float(N_ci[0]), float(N_ci[1])) if N_ci is not None else None), esw=float(mu_hat), p_detect=float(mu_hat / w), encounter_rate=float(n_obs / transect_length), sigma=float(df.params[0]), aic=float(df.aic(d)), cv_density=cv, params=df.params, )