File size: 6,072 Bytes
1d197a4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""Scoring and event-detection utilities for lumen shape stability."""

from typing import Tuple

import numpy as np


def compute_oblongness_scores(lumen_masks: np.ndarray) -> np.ndarray:
    """Compute per-frame oblongness score in [0, 1] from binary lumen masks."""
    scores = np.zeros((lumen_masks.shape[0],), dtype=np.float32)
    for idx in range(lumen_masks.shape[0]):
        ys, xs = np.where(lumen_masks[idx] > 0)
        if ys.size < 20:
            scores[idx] = np.float32(0.0)
            continue

        x = xs.astype(np.float32)
        y = ys.astype(np.float32)
        x -= float(np.mean(x))
        y -= float(np.mean(y))
        cov = np.cov(np.vstack((x, y)))
        eigvals = np.linalg.eigvalsh(cov)
        eigvals = np.maximum(eigvals, 1e-6)
        axis_ratio = float(np.sqrt(eigvals[-1] / eigvals[0]))
        scores[idx] = np.float32(np.clip((axis_ratio - 1.0) / (axis_ratio + 1.0), 0.0, 1.0))
    return scores


def moving_average(x: np.ndarray, window: int) -> np.ndarray:
    """Centered moving-average smoothing."""
    if window <= 1:
        return x.copy()
    kernel = np.ones((window,), dtype=np.float32) / float(window)
    pad = window // 2
    x_pad = np.pad(x.astype(np.float32), (pad, pad), mode="edge")
    return np.convolve(x_pad, kernel, mode="valid")


def causal_moving_average(x: np.ndarray, window: int) -> np.ndarray:
    """Past-only moving average for online detection."""
    x = x.astype(np.float32)
    if window <= 1:
        return x.copy()
    out = np.zeros_like(x, dtype=np.float32)
    csum = np.cumsum(x, dtype=np.float32)
    for idx in range(x.shape[0]):
        start = max(0, idx - window + 1)
        total = csum[idx] - (csum[start - 1] if start > 0 else 0.0)
        out[idx] = total / float(idx - start + 1)
    return out


def keep_sustained_runs(flags: np.ndarray, min_run: int) -> np.ndarray:
    """Keep only True runs with length >= ``min_run``."""
    kept = np.zeros_like(flags, dtype=bool)
    start = None
    for idx, val in enumerate(flags):
        if val and start is None:
            start = idx
        if (not val or idx == len(flags) - 1) and start is not None:
            end = idx if not val else idx + 1
            if (end - start) >= min_run:
                kept[start:end] = True
            start = None
    return kept


def detect_sustained_bifurcation_signal(oblong_scores: np.ndarray, fps: float) -> Tuple[np.ndarray, np.ndarray]:
    """Detect sustained high-oblongness intervals and return (smoothed_scores, flags)."""
    smooth_window = max(5, int(round(max(fps, 1.0) * 0.5)))
    if smooth_window % 2 == 0:
        smooth_window += 1
    smoothed = moving_average(oblong_scores, smooth_window)

    median = float(np.median(smoothed))
    mad = float(np.median(np.abs(smoothed - median)))
    robust_scale = max(1e-6, 1.4826 * mad)
    robust_z = (smoothed - median) / robust_scale

    candidate = (smoothed > 0.35) & (robust_z > 2.5)
    min_run = max(5, int(round(max(fps, 1.0) * 0.4)))
    sustained = keep_sustained_runs(candidate.astype(bool), min_run=min_run)
    return smoothed.astype(np.float32), sustained


def detect_online_bifurcation_signal(oblong_scores: np.ndarray, fps: float) -> Tuple[np.ndarray, np.ndarray]:
    """Online bifurcation detector using only past information.

    Algorithm:
    - Causal smoothing (past-only moving average).
    - Running robust baseline from a fixed history window.
    - Incremental sustained-run confirmation (no future lookahead).
    """
    x = np.asarray(oblong_scores, dtype=np.float32)
    if x.size == 0:
        return x.copy(), np.zeros((0,), dtype=bool)

    smooth_window = max(5, int(round(max(fps, 1.0) * 0.35)))
    smoothed = causal_moving_average(x, smooth_window)

    baseline_window = max(24, int(round(max(fps, 1.0) * 8.0)))
    warmup = max(12, int(round(max(fps, 1.0) * 1.0)))
    min_run = max(6, int(round(max(fps, 1.0) * 0.4)))

    sustained = np.zeros((x.shape[0],), dtype=bool)
    run_len = 0
    misses_left = 0
    active = False

    # Causal slope signal helps detect onset before level becomes very high.
    slope = np.zeros_like(smoothed, dtype=np.float32)
    slope[1:] = smoothed[1:] - smoothed[:-1]

    for idx in range(x.shape[0]):
        hist_start = max(0, idx - baseline_window)
        history = smoothed[hist_start:idx]  # past only
        if history.size < warmup:
            run_len = 0
            misses_left = 0
            continue

        median = float(np.median(history))
        mad = float(np.median(np.abs(history - median)))
        robust_scale = max(1e-6, 1.4826 * mad)
        robust_z = (float(smoothed[idx]) - median) / robust_scale

        q90 = float(np.quantile(history, 0.9))
        dyn_level = max(0.31, q90 + 0.05)

        slope_hist = slope[hist_start:idx]
        slope_median = float(np.median(slope_hist))
        slope_mad = float(np.median(np.abs(slope_hist - slope_median)))
        slope_scale = max(1e-6, 1.4826 * slope_mad)
        slope_z = (float(slope[idx]) - slope_median) / slope_scale

        level = float(smoothed[idx])
        enter_candidate = (
            (level >= dyn_level and robust_z > 1.95)
            or (level >= 0.29 and robust_z > 1.3 and slope_z > 2.6)
        )
        continue_candidate = (
            (level >= max(0.27, dyn_level - 0.02) and robust_z > 1.2)
            or (level >= 0.25 and robust_z > 1.0 and slope_z > 1.8)
        )
        is_candidate = continue_candidate if active else enter_candidate

        if is_candidate:
            run_len += 1
            misses_left = 1
            active = True
            if run_len >= min_run:
                # Confirm sustained event using only history seen so far.
                sustained[idx - min_run + 1 : idx + 1] = True
        else:
            # Allow a single miss so short dips do not break a sustained run.
            if misses_left > 0 and run_len > 0:
                misses_left -= 1
            else:
                run_len = 0
                active = False

    return smoothed.astype(np.float32), sustained