| """Line-transect distance sampling — detection-function fit + density estimate. |
| |
| A faithful, dependency-light implementation of conventional distance sampling |
| (Buckland, Anderson, Burnham, Laake, Borchers & Thomas, *Introduction to |
| Distance Sampling*, 2001). It estimates animal **density** (and, given an area, |
| **abundance**) from the perpendicular distances of detected objects to a survey |
| line, correcting for the fact that detection probability falls off with distance. |
| |
| Why this matters for Prometheus: a detector counts what it *sees*; distance |
| sampling estimates what is actually *there*, with confidence intervals — the |
| number an ecologist can put in a management report. |
| |
| Model (line transect, one detection function over [0, w]): |
| |
| detected-distance pdf f(y) = g(y) / mu, mu = integral_0^w g(y) dy |
| effective strip width ESW = mu (per side; g(0) = 1) |
| density D = n / (2 * L * mu) |
| abundance N = D * A |
| avg detection prob. Pa = mu / w |
| encounter rate n / L |
| |
| `mu` is the *effective strip half-width* (ESW): the half-width of a hypothetical |
| strip within which all objects would be detected to give the same expected |
| count. Detection functions supported: half-normal and hazard-rate, selected by |
| AIC. Uncertainty is via a nonparametric bootstrap (encounter-rate variance from |
| a Poisson resample of n, detection-function variance from resampling distances), |
| which is robust and easy to validate against a known synthetic truth. |
| |
| Only numpy + scipy are required; nothing here imports torch/ultralytics. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| from dataclasses import dataclass, field |
| from typing import Literal |
|
|
| import numpy as np |
| from scipy.integrate import quad |
| from scipy.optimize import minimize |
|
|
| Model = Literal["half-normal", "hazard-rate"] |
|
|
|
|
| |
| |
| |
| class DetectionFunction: |
| """A fitted detection function g(y) on [0, w] with g(0) = 1. |
| |
| Fit by maximum likelihood on the *detected* perpendicular distances. The |
| likelihood is the product of f(y_i) = g(y_i) / mu, i.e. the pdf of distances |
| conditional on detection within the truncation width w. |
| """ |
|
|
| def __init__(self, model: Model, params: np.ndarray, w: float): |
| self.model = model |
| self.params = np.asarray(params, dtype=float) |
| self.w = float(w) |
| self._n_params = len(self.params) |
|
|
| |
| def g(self, y: np.ndarray | float) -> np.ndarray: |
| y = np.asarray(y, dtype=float) |
| if self.model == "half-normal": |
| (sigma,) = self.params |
| return np.exp(-(y**2) / (2.0 * sigma**2)) |
| |
| sigma, b = self.params |
| with np.errstate(divide="ignore", over="ignore", invalid="ignore"): |
| ratio = np.where(y > 0, y / sigma, np.inf) |
| out = 1.0 - np.exp(-np.power(ratio, -b)) |
| |
| return np.where(y <= 0, 1.0, out) |
|
|
| |
| def esw(self) -> float: |
| if self.model == "half-normal": |
| (sigma,) = self.params |
| return float(sigma * math.sqrt(math.pi / 2.0) * math.erf(self.w / (sigma * math.sqrt(2.0)))) |
| val, _ = quad(lambda y: float(self.g(y)), 0.0, self.w, limit=100) |
| return float(val) |
|
|
| def p_detect(self) -> float: |
| """Average detection probability over [0, w] = ESW / w.""" |
| return self.esw() / self.w |
|
|
| |
| def neg_log_likelihood(self, distances: np.ndarray) -> float: |
| mu = self.esw() |
| if mu <= 0 or not np.isfinite(mu): |
| return np.inf |
| g = self.g(distances) |
| if np.any(g <= 0) or not np.all(np.isfinite(g)): |
| return np.inf |
| |
| return float(-np.sum(np.log(g)) + len(distances) * math.log(mu)) |
|
|
| def aic(self, distances: np.ndarray) -> float: |
| return 2.0 * self._n_params + 2.0 * self.neg_log_likelihood(distances) |
|
|
|
|
| def _fit_one(model: Model, distances: np.ndarray, w: float) -> DetectionFunction: |
| """MLE fit of a single detection function to truncated distances.""" |
| d = np.asarray(distances, dtype=float) |
| d = d[(d >= 0) & (d <= w)] |
| if len(d) < 2: |
| raise ValueError("Need at least 2 in-truncation distances to fit a detection function.") |
| scale0 = max(np.std(d), w / 4.0, 1e-6) |
|
|
| def nll(theta): |
| df = DetectionFunction(model, np.exp(theta), w) |
| return df.neg_log_likelihood(d) |
|
|
| if model == "half-normal": |
| x0 = np.array([math.log(scale0)]) |
| else: |
| x0 = np.array([math.log(scale0), math.log(2.0)]) |
|
|
| res = minimize(nll, x0, method="Nelder-Mead", |
| options={"xatol": 1e-6, "fatol": 1e-6, "maxiter": 2000}) |
| return DetectionFunction(model, np.exp(res.x), w) |
|
|
|
|
| |
| |
| |
| @dataclass |
| class DistanceSamplingResult: |
| model: Model |
| n: int |
| transect_length: float |
| truncation_w: float |
| area: float | None |
|
|
| density: float |
| density_ci: tuple[float, float] |
| abundance: float | None |
| abundance_ci: tuple[float, float] | None |
|
|
| esw: float |
| p_detect: float |
| encounter_rate: float |
| sigma: float |
| aic: float |
| cv_density: float |
| params: np.ndarray = field(default_factory=lambda: np.array([])) |
|
|
| def summary(self) -> str: |
| lo, hi = self.density_ci |
| lines = [ |
| f"Distance sampling — {self.model} detection function", |
| f" n detections (within w={self.truncation_w:g}): {self.n}", |
| f" effective strip half-width (ESW): {self.esw:.4g}", |
| f" avg detection probability: {self.p_detect:.3f}", |
| f" encounter rate (n/L): {self.encounter_rate:.4g}", |
| f" DENSITY: {self.density:.4g} (95% CI {lo:.4g}–{hi:.4g}, CV {self.cv_density:.1%})", |
| ] |
| if self.abundance is not None and self.abundance_ci is not None: |
| alo, ahi = self.abundance_ci |
| lines.append(f" ABUNDANCE: {self.abundance:.4g} (95% CI {alo:.4g}–{ahi:.4g})") |
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
| def estimate_density( |
| distances, |
| transect_length: float, |
| truncation: float | None = None, |
| area: float | None = None, |
| model: Model | Literal["auto"] = "auto", |
| two_sided: bool = True, |
| n_boot: int = 1000, |
| ci: float = 0.95, |
| seed: int | None = 0, |
| ) -> DistanceSamplingResult: |
| """Estimate density (and abundance) from perpendicular detection distances. |
| |
| Args: |
| distances: perpendicular distances of detected objects to the transect |
| line (same length unit throughout). |
| transect_length: total length L of the survey line(s). |
| truncation: right-truncation width w. Detections beyond w are discarded |
| (standard practice — the far tail is noisy). Defaults to the largest |
| observed distance. |
| area: survey region area A, to also return abundance N = D * A. Optional. |
| model: "half-normal", "hazard-rate", or "auto" (pick lower AIC). |
| two_sided: True for a line transect detecting on both sides (effective |
| covered area = 2 * ESW * L); False for one-sided strip. |
| n_boot: bootstrap iterations for the confidence interval. |
| ci: confidence level (0.95 -> 95% interval). |
| seed: RNG seed for reproducible bootstrap (None for nondeterministic). |
| |
| Returns: |
| DistanceSamplingResult with density, CI, ESW, detection probability, |
| encounter rate, and (if area given) abundance. |
| """ |
| d_all = np.asarray(distances, dtype=float) |
| d_all = d_all[np.isfinite(d_all) & (d_all >= 0)] |
| if len(d_all) < 2: |
| raise ValueError("Need at least 2 non-negative distances.") |
| w = float(truncation) if truncation is not None else float(d_all.max()) |
| d = d_all[d_all <= w] |
| if len(d) < 2: |
| raise ValueError("Need at least 2 distances within the truncation width.") |
|
|
| side = 2.0 if two_sided else 1.0 |
|
|
| def fit_and_density(dist: np.ndarray, n: int, chosen: Model | None): |
| if model == "auto" and chosen is None: |
| cands = [] |
| for m in ("half-normal", "hazard-rate"): |
| try: |
| df_m = _fit_one(m, dist, w) |
| cands.append((df_m.aic(dist), m, df_m)) |
| except Exception: |
| continue |
| if not cands: |
| raise RuntimeError("No detection function could be fit.") |
| _, m_best, df_best = min(cands, key=lambda t: t[0]) |
| else: |
| m_best = chosen or (model if model != "auto" else "half-normal") |
| df_best = _fit_one(m_best, dist, w) |
| mu = df_best.esw() |
| D = n / (side * transect_length * mu) |
| return D, df_best, m_best, mu |
|
|
| |
| D_hat, df, m_best, mu_hat = fit_and_density(d, len(d), None) |
|
|
| |
| |
| rng = np.random.default_rng(seed) |
| boot = np.empty(n_boot, dtype=float) |
| n_obs = len(d) |
| for i in range(n_boot): |
| n_star = rng.poisson(n_obs) |
| if n_star < 2: |
| boot[i] = np.nan |
| continue |
| d_star = rng.choice(d, size=n_star, replace=True) |
| try: |
| D_star, *_ = fit_and_density(d_star, n_star, m_best) |
| boot[i] = D_star |
| except Exception: |
| boot[i] = np.nan |
| boot = boot[np.isfinite(boot)] |
| alpha = 1.0 - ci |
| lo, hi = np.quantile(boot, [alpha / 2.0, 1.0 - alpha / 2.0]) |
| cv = float(np.std(boot, ddof=1) / D_hat) if D_hat > 0 else float("nan") |
|
|
| N = D_hat * area if area is not None else None |
| N_ci = (lo * area, hi * area) if area is not None else None |
|
|
| return DistanceSamplingResult( |
| model=m_best, |
| n=n_obs, |
| transect_length=float(transect_length), |
| truncation_w=w, |
| area=area, |
| density=float(D_hat), |
| density_ci=(float(lo), float(hi)), |
| abundance=(float(N) if N is not None else None), |
| abundance_ci=((float(N_ci[0]), float(N_ci[1])) if N_ci is not None else None), |
| esw=float(mu_hat), |
| p_detect=float(mu_hat / w), |
| encounter_rate=float(n_obs / transect_length), |
| sigma=float(df.params[0]), |
| aic=float(df.aic(d)), |
| cv_density=cv, |
| params=df.params, |
| ) |
|
|