Instructions to use Aditya2162/ivus-segmentation with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Keras
How to use Aditya2162/ivus-segmentation with Keras:
# Available backend options are: "jax", "torch", "tensorflow". import os os.environ["KERAS_BACKEND"] = "jax" import keras model = keras.saving.load_model("hf://Aditya2162/ivus-segmentation") - Notebooks
- Google Colab
- Kaggle
| """Scoring and event-detection utilities for lumen shape stability.""" | |
| from typing import Tuple | |
| import numpy as np | |
| def compute_oblongness_scores(lumen_masks: np.ndarray) -> np.ndarray: | |
| """Compute per-frame oblongness score in [0, 1] from binary lumen masks.""" | |
| scores = np.zeros((lumen_masks.shape[0],), dtype=np.float32) | |
| for idx in range(lumen_masks.shape[0]): | |
| ys, xs = np.where(lumen_masks[idx] > 0) | |
| if ys.size < 20: | |
| scores[idx] = np.float32(0.0) | |
| continue | |
| x = xs.astype(np.float32) | |
| y = ys.astype(np.float32) | |
| x -= float(np.mean(x)) | |
| y -= float(np.mean(y)) | |
| cov = np.cov(np.vstack((x, y))) | |
| eigvals = np.linalg.eigvalsh(cov) | |
| eigvals = np.maximum(eigvals, 1e-6) | |
| axis_ratio = float(np.sqrt(eigvals[-1] / eigvals[0])) | |
| scores[idx] = np.float32(np.clip((axis_ratio - 1.0) / (axis_ratio + 1.0), 0.0, 1.0)) | |
| return scores | |
| def moving_average(x: np.ndarray, window: int) -> np.ndarray: | |
| """Centered moving-average smoothing.""" | |
| if window <= 1: | |
| return x.copy() | |
| kernel = np.ones((window,), dtype=np.float32) / float(window) | |
| pad = window // 2 | |
| x_pad = np.pad(x.astype(np.float32), (pad, pad), mode="edge") | |
| return np.convolve(x_pad, kernel, mode="valid") | |
| def causal_moving_average(x: np.ndarray, window: int) -> np.ndarray: | |
| """Past-only moving average for online detection.""" | |
| x = x.astype(np.float32) | |
| if window <= 1: | |
| return x.copy() | |
| out = np.zeros_like(x, dtype=np.float32) | |
| csum = np.cumsum(x, dtype=np.float32) | |
| for idx in range(x.shape[0]): | |
| start = max(0, idx - window + 1) | |
| total = csum[idx] - (csum[start - 1] if start > 0 else 0.0) | |
| out[idx] = total / float(idx - start + 1) | |
| return out | |
| def keep_sustained_runs(flags: np.ndarray, min_run: int) -> np.ndarray: | |
| """Keep only True runs with length >= ``min_run``.""" | |
| kept = np.zeros_like(flags, dtype=bool) | |
| start = None | |
| for idx, val in enumerate(flags): | |
| if val and start is None: | |
| start = idx | |
| if (not val or idx == len(flags) - 1) and start is not None: | |
| end = idx if not val else idx + 1 | |
| if (end - start) >= min_run: | |
| kept[start:end] = True | |
| start = None | |
| return kept | |
| def detect_sustained_bifurcation_signal(oblong_scores: np.ndarray, fps: float) -> Tuple[np.ndarray, np.ndarray]: | |
| """Detect sustained high-oblongness intervals and return (smoothed_scores, flags).""" | |
| smooth_window = max(5, int(round(max(fps, 1.0) * 0.5))) | |
| if smooth_window % 2 == 0: | |
| smooth_window += 1 | |
| smoothed = moving_average(oblong_scores, smooth_window) | |
| median = float(np.median(smoothed)) | |
| mad = float(np.median(np.abs(smoothed - median))) | |
| robust_scale = max(1e-6, 1.4826 * mad) | |
| robust_z = (smoothed - median) / robust_scale | |
| candidate = (smoothed > 0.35) & (robust_z > 2.5) | |
| min_run = max(5, int(round(max(fps, 1.0) * 0.4))) | |
| sustained = keep_sustained_runs(candidate.astype(bool), min_run=min_run) | |
| return smoothed.astype(np.float32), sustained | |
| def detect_online_bifurcation_signal(oblong_scores: np.ndarray, fps: float) -> Tuple[np.ndarray, np.ndarray]: | |
| """Online bifurcation detector using only past information. | |
| Algorithm: | |
| - Causal smoothing (past-only moving average). | |
| - Running robust baseline from a fixed history window. | |
| - Incremental sustained-run confirmation (no future lookahead). | |
| """ | |
| x = np.asarray(oblong_scores, dtype=np.float32) | |
| if x.size == 0: | |
| return x.copy(), np.zeros((0,), dtype=bool) | |
| smooth_window = max(5, int(round(max(fps, 1.0) * 0.35))) | |
| smoothed = causal_moving_average(x, smooth_window) | |
| baseline_window = max(24, int(round(max(fps, 1.0) * 8.0))) | |
| warmup = max(12, int(round(max(fps, 1.0) * 1.0))) | |
| min_run = max(6, int(round(max(fps, 1.0) * 0.4))) | |
| sustained = np.zeros((x.shape[0],), dtype=bool) | |
| run_len = 0 | |
| misses_left = 0 | |
| active = False | |
| # Causal slope signal helps detect onset before level becomes very high. | |
| slope = np.zeros_like(smoothed, dtype=np.float32) | |
| slope[1:] = smoothed[1:] - smoothed[:-1] | |
| for idx in range(x.shape[0]): | |
| hist_start = max(0, idx - baseline_window) | |
| history = smoothed[hist_start:idx] # past only | |
| if history.size < warmup: | |
| run_len = 0 | |
| misses_left = 0 | |
| continue | |
| median = float(np.median(history)) | |
| mad = float(np.median(np.abs(history - median))) | |
| robust_scale = max(1e-6, 1.4826 * mad) | |
| robust_z = (float(smoothed[idx]) - median) / robust_scale | |
| q90 = float(np.quantile(history, 0.9)) | |
| dyn_level = max(0.31, q90 + 0.05) | |
| slope_hist = slope[hist_start:idx] | |
| slope_median = float(np.median(slope_hist)) | |
| slope_mad = float(np.median(np.abs(slope_hist - slope_median))) | |
| slope_scale = max(1e-6, 1.4826 * slope_mad) | |
| slope_z = (float(slope[idx]) - slope_median) / slope_scale | |
| level = float(smoothed[idx]) | |
| enter_candidate = ( | |
| (level >= dyn_level and robust_z > 1.95) | |
| or (level >= 0.29 and robust_z > 1.3 and slope_z > 2.6) | |
| ) | |
| continue_candidate = ( | |
| (level >= max(0.27, dyn_level - 0.02) and robust_z > 1.2) | |
| or (level >= 0.25 and robust_z > 1.0 and slope_z > 1.8) | |
| ) | |
| is_candidate = continue_candidate if active else enter_candidate | |
| if is_candidate: | |
| run_len += 1 | |
| misses_left = 1 | |
| active = True | |
| if run_len >= min_run: | |
| # Confirm sustained event using only history seen so far. | |
| sustained[idx - min_run + 1 : idx + 1] = True | |
| else: | |
| # Allow a single miss so short dips do not break a sustained run. | |
| if misses_left > 0 and run_len > 0: | |
| misses_left -= 1 | |
| else: | |
| run_len = 0 | |
| active = False | |
| return smoothed.astype(np.float32), sustained | |