Aditya2162's picture
Upload folder using huggingface_hub
1d197a4 verified
"""Scoring and event-detection utilities for lumen shape stability."""
from typing import Tuple
import numpy as np
def compute_oblongness_scores(lumen_masks: np.ndarray) -> np.ndarray:
"""Compute per-frame oblongness score in [0, 1] from binary lumen masks."""
scores = np.zeros((lumen_masks.shape[0],), dtype=np.float32)
for idx in range(lumen_masks.shape[0]):
ys, xs = np.where(lumen_masks[idx] > 0)
if ys.size < 20:
scores[idx] = np.float32(0.0)
continue
x = xs.astype(np.float32)
y = ys.astype(np.float32)
x -= float(np.mean(x))
y -= float(np.mean(y))
cov = np.cov(np.vstack((x, y)))
eigvals = np.linalg.eigvalsh(cov)
eigvals = np.maximum(eigvals, 1e-6)
axis_ratio = float(np.sqrt(eigvals[-1] / eigvals[0]))
scores[idx] = np.float32(np.clip((axis_ratio - 1.0) / (axis_ratio + 1.0), 0.0, 1.0))
return scores
def moving_average(x: np.ndarray, window: int) -> np.ndarray:
"""Centered moving-average smoothing."""
if window <= 1:
return x.copy()
kernel = np.ones((window,), dtype=np.float32) / float(window)
pad = window // 2
x_pad = np.pad(x.astype(np.float32), (pad, pad), mode="edge")
return np.convolve(x_pad, kernel, mode="valid")
def causal_moving_average(x: np.ndarray, window: int) -> np.ndarray:
"""Past-only moving average for online detection."""
x = x.astype(np.float32)
if window <= 1:
return x.copy()
out = np.zeros_like(x, dtype=np.float32)
csum = np.cumsum(x, dtype=np.float32)
for idx in range(x.shape[0]):
start = max(0, idx - window + 1)
total = csum[idx] - (csum[start - 1] if start > 0 else 0.0)
out[idx] = total / float(idx - start + 1)
return out
def keep_sustained_runs(flags: np.ndarray, min_run: int) -> np.ndarray:
"""Keep only True runs with length >= ``min_run``."""
kept = np.zeros_like(flags, dtype=bool)
start = None
for idx, val in enumerate(flags):
if val and start is None:
start = idx
if (not val or idx == len(flags) - 1) and start is not None:
end = idx if not val else idx + 1
if (end - start) >= min_run:
kept[start:end] = True
start = None
return kept
def detect_sustained_bifurcation_signal(oblong_scores: np.ndarray, fps: float) -> Tuple[np.ndarray, np.ndarray]:
"""Detect sustained high-oblongness intervals and return (smoothed_scores, flags)."""
smooth_window = max(5, int(round(max(fps, 1.0) * 0.5)))
if smooth_window % 2 == 0:
smooth_window += 1
smoothed = moving_average(oblong_scores, smooth_window)
median = float(np.median(smoothed))
mad = float(np.median(np.abs(smoothed - median)))
robust_scale = max(1e-6, 1.4826 * mad)
robust_z = (smoothed - median) / robust_scale
candidate = (smoothed > 0.35) & (robust_z > 2.5)
min_run = max(5, int(round(max(fps, 1.0) * 0.4)))
sustained = keep_sustained_runs(candidate.astype(bool), min_run=min_run)
return smoothed.astype(np.float32), sustained
def detect_online_bifurcation_signal(oblong_scores: np.ndarray, fps: float) -> Tuple[np.ndarray, np.ndarray]:
"""Online bifurcation detector using only past information.
Algorithm:
- Causal smoothing (past-only moving average).
- Running robust baseline from a fixed history window.
- Incremental sustained-run confirmation (no future lookahead).
"""
x = np.asarray(oblong_scores, dtype=np.float32)
if x.size == 0:
return x.copy(), np.zeros((0,), dtype=bool)
smooth_window = max(5, int(round(max(fps, 1.0) * 0.35)))
smoothed = causal_moving_average(x, smooth_window)
baseline_window = max(24, int(round(max(fps, 1.0) * 8.0)))
warmup = max(12, int(round(max(fps, 1.0) * 1.0)))
min_run = max(6, int(round(max(fps, 1.0) * 0.4)))
sustained = np.zeros((x.shape[0],), dtype=bool)
run_len = 0
misses_left = 0
active = False
# Causal slope signal helps detect onset before level becomes very high.
slope = np.zeros_like(smoothed, dtype=np.float32)
slope[1:] = smoothed[1:] - smoothed[:-1]
for idx in range(x.shape[0]):
hist_start = max(0, idx - baseline_window)
history = smoothed[hist_start:idx] # past only
if history.size < warmup:
run_len = 0
misses_left = 0
continue
median = float(np.median(history))
mad = float(np.median(np.abs(history - median)))
robust_scale = max(1e-6, 1.4826 * mad)
robust_z = (float(smoothed[idx]) - median) / robust_scale
q90 = float(np.quantile(history, 0.9))
dyn_level = max(0.31, q90 + 0.05)
slope_hist = slope[hist_start:idx]
slope_median = float(np.median(slope_hist))
slope_mad = float(np.median(np.abs(slope_hist - slope_median)))
slope_scale = max(1e-6, 1.4826 * slope_mad)
slope_z = (float(slope[idx]) - slope_median) / slope_scale
level = float(smoothed[idx])
enter_candidate = (
(level >= dyn_level and robust_z > 1.95)
or (level >= 0.29 and robust_z > 1.3 and slope_z > 2.6)
)
continue_candidate = (
(level >= max(0.27, dyn_level - 0.02) and robust_z > 1.2)
or (level >= 0.25 and robust_z > 1.0 and slope_z > 1.8)
)
is_candidate = continue_candidate if active else enter_candidate
if is_candidate:
run_len += 1
misses_left = 1
active = True
if run_len >= min_run:
# Confirm sustained event using only history seen so far.
sustained[idx - min_run + 1 : idx + 1] = True
else:
# Allow a single miss so short dips do not break a sustained run.
if misses_left > 0 and run_len > 0:
misses_left -= 1
else:
run_len = 0
active = False
return smoothed.astype(np.float32), sustained