"""Scoring and event-detection utilities for lumen shape stability.""" from typing import Tuple import numpy as np def compute_oblongness_scores(lumen_masks: np.ndarray) -> np.ndarray: """Compute per-frame oblongness score in [0, 1] from binary lumen masks.""" scores = np.zeros((lumen_masks.shape[0],), dtype=np.float32) for idx in range(lumen_masks.shape[0]): ys, xs = np.where(lumen_masks[idx] > 0) if ys.size < 20: scores[idx] = np.float32(0.0) continue x = xs.astype(np.float32) y = ys.astype(np.float32) x -= float(np.mean(x)) y -= float(np.mean(y)) cov = np.cov(np.vstack((x, y))) eigvals = np.linalg.eigvalsh(cov) eigvals = np.maximum(eigvals, 1e-6) axis_ratio = float(np.sqrt(eigvals[-1] / eigvals[0])) scores[idx] = np.float32(np.clip((axis_ratio - 1.0) / (axis_ratio + 1.0), 0.0, 1.0)) return scores def moving_average(x: np.ndarray, window: int) -> np.ndarray: """Centered moving-average smoothing.""" if window <= 1: return x.copy() kernel = np.ones((window,), dtype=np.float32) / float(window) pad = window // 2 x_pad = np.pad(x.astype(np.float32), (pad, pad), mode="edge") return np.convolve(x_pad, kernel, mode="valid") def causal_moving_average(x: np.ndarray, window: int) -> np.ndarray: """Past-only moving average for online detection.""" x = x.astype(np.float32) if window <= 1: return x.copy() out = np.zeros_like(x, dtype=np.float32) csum = np.cumsum(x, dtype=np.float32) for idx in range(x.shape[0]): start = max(0, idx - window + 1) total = csum[idx] - (csum[start - 1] if start > 0 else 0.0) out[idx] = total / float(idx - start + 1) return out def keep_sustained_runs(flags: np.ndarray, min_run: int) -> np.ndarray: """Keep only True runs with length >= ``min_run``.""" kept = np.zeros_like(flags, dtype=bool) start = None for idx, val in enumerate(flags): if val and start is None: start = idx if (not val or idx == len(flags) - 1) and start is not None: end = idx if not val else idx + 1 if (end - start) >= min_run: kept[start:end] = True start = None return kept def detect_sustained_bifurcation_signal(oblong_scores: np.ndarray, fps: float) -> Tuple[np.ndarray, np.ndarray]: """Detect sustained high-oblongness intervals and return (smoothed_scores, flags).""" smooth_window = max(5, int(round(max(fps, 1.0) * 0.5))) if smooth_window % 2 == 0: smooth_window += 1 smoothed = moving_average(oblong_scores, smooth_window) median = float(np.median(smoothed)) mad = float(np.median(np.abs(smoothed - median))) robust_scale = max(1e-6, 1.4826 * mad) robust_z = (smoothed - median) / robust_scale candidate = (smoothed > 0.35) & (robust_z > 2.5) min_run = max(5, int(round(max(fps, 1.0) * 0.4))) sustained = keep_sustained_runs(candidate.astype(bool), min_run=min_run) return smoothed.astype(np.float32), sustained def detect_online_bifurcation_signal(oblong_scores: np.ndarray, fps: float) -> Tuple[np.ndarray, np.ndarray]: """Online bifurcation detector using only past information. Algorithm: - Causal smoothing (past-only moving average). - Running robust baseline from a fixed history window. - Incremental sustained-run confirmation (no future lookahead). """ x = np.asarray(oblong_scores, dtype=np.float32) if x.size == 0: return x.copy(), np.zeros((0,), dtype=bool) smooth_window = max(5, int(round(max(fps, 1.0) * 0.35))) smoothed = causal_moving_average(x, smooth_window) baseline_window = max(24, int(round(max(fps, 1.0) * 8.0))) warmup = max(12, int(round(max(fps, 1.0) * 1.0))) min_run = max(6, int(round(max(fps, 1.0) * 0.4))) sustained = np.zeros((x.shape[0],), dtype=bool) run_len = 0 misses_left = 0 active = False # Causal slope signal helps detect onset before level becomes very high. slope = np.zeros_like(smoothed, dtype=np.float32) slope[1:] = smoothed[1:] - smoothed[:-1] for idx in range(x.shape[0]): hist_start = max(0, idx - baseline_window) history = smoothed[hist_start:idx] # past only if history.size < warmup: run_len = 0 misses_left = 0 continue median = float(np.median(history)) mad = float(np.median(np.abs(history - median))) robust_scale = max(1e-6, 1.4826 * mad) robust_z = (float(smoothed[idx]) - median) / robust_scale q90 = float(np.quantile(history, 0.9)) dyn_level = max(0.31, q90 + 0.05) slope_hist = slope[hist_start:idx] slope_median = float(np.median(slope_hist)) slope_mad = float(np.median(np.abs(slope_hist - slope_median))) slope_scale = max(1e-6, 1.4826 * slope_mad) slope_z = (float(slope[idx]) - slope_median) / slope_scale level = float(smoothed[idx]) enter_candidate = ( (level >= dyn_level and robust_z > 1.95) or (level >= 0.29 and robust_z > 1.3 and slope_z > 2.6) ) continue_candidate = ( (level >= max(0.27, dyn_level - 0.02) and robust_z > 1.2) or (level >= 0.25 and robust_z > 1.0 and slope_z > 1.8) ) is_candidate = continue_candidate if active else enter_candidate if is_candidate: run_len += 1 misses_left = 1 active = True if run_len >= min_run: # Confirm sustained event using only history seen so far. sustained[idx - min_run + 1 : idx + 1] = True else: # Allow a single miss so short dips do not break a sustained run. if misses_left > 0 and run_len > 0: misses_left -= 1 else: run_len = 0 active = False return smoothed.astype(np.float32), sustained