Spaces:
Sleeping
Sleeping
| from typing import Any, Dict, List, Tuple | |
| from sundew.gating import gate_probability_with_hysteresis, significance_score | |
| def _clamp(value: float, low: float = 0.0, high: float = 1.0) -> float: | |
| return max(low, min(high, value)) | |
| def _window_features(window: List[float]) -> Dict[str, float]: | |
| """ | |
| Compute basic features for a window to feed Sundew's significance score. | |
| """ | |
| if not window: | |
| return {"magnitude": 0.0, "anomaly_score": 0.0, "context_relevance": 0.0, "urgency": 0.0} | |
| length = float(len(window)) | |
| mean = sum(window) / length | |
| mean_abs = sum(abs(x) for x in window) / length | |
| max_abs = max(abs(x) for x in window) | |
| variance = sum((x - mean) ** 2 for x in window) / length | |
| magnitude = _clamp(max_abs * 10.0, 0.0, 10.0) * 10.0 # 0..100 scale | |
| anomaly_score = _clamp(variance / (variance + 1.0)) | |
| context_relevance = _clamp(mean_abs) | |
| urgency = _clamp(mean_abs * 0.5) | |
| return { | |
| "magnitude": magnitude, | |
| "anomaly_score": anomaly_score, | |
| "context_relevance": context_relevance, | |
| "urgency": urgency, | |
| } | |
| def gate_signal( | |
| signal: List[float], | |
| window_size: int = 128, | |
| step: int = 64, | |
| threshold: float = 0.55, | |
| temperature: float = 0.15, | |
| return_windows: bool = False, | |
| max_windows: int = 200, | |
| ) -> Tuple[List[float], Dict[str, Any]]: | |
| """ | |
| Apply Sundew gating over a sliding window to reduce workload. | |
| Args: | |
| signal: raw signal values | |
| window_size: sliding window size | |
| step: stride between windows | |
| threshold: gate threshold | |
| temperature: gate temperature (0=hard) | |
| return_windows: include per-window metadata | |
| max_windows: limit number of windows returned (for previews) | |
| Returns: | |
| gated_signal: flattened list of selected windows (original signal if nothing selected) | |
| meta: gating metadata (counts, ratios, thresholds, optional windows) | |
| """ | |
| if len(signal) < window_size: | |
| meta = { | |
| "total_windows": 0, | |
| "selected_windows": 0, | |
| "ratio": 1.0, | |
| "threshold": threshold, | |
| "temperature": temperature, | |
| } | |
| return signal, meta | |
| last_activation = False | |
| selected: List[float] = [] | |
| total_windows = 0 | |
| selected_windows = 0 | |
| windows_meta: List[Dict[str, Any]] = [] | |
| for start in range(0, len(signal) - window_size + 1, step): | |
| window = signal[start : start + window_size] | |
| total_windows += 1 | |
| features = _window_features(window) | |
| sig = significance_score(features, w_mag=0.35, w_ano=0.4, w_ctx=0.15, w_urg=0.1) | |
| prob = gate_probability_with_hysteresis(sig, threshold=threshold, temperature=temperature, last_activation=last_activation) | |
| chosen = prob >= 0.5 | |
| if chosen: | |
| selected.extend(window) | |
| selected_windows += 1 | |
| last_activation = True | |
| else: | |
| last_activation = False | |
| if return_windows and len(windows_meta) < max_windows: | |
| windows_meta.append( | |
| { | |
| "start": start, | |
| "end": start + window_size, | |
| "significance": sig, | |
| "probability": prob, | |
| "selected": chosen, | |
| } | |
| ) | |
| if not selected: | |
| selected = signal # fall back to full signal | |
| meta: Dict[str, Any] = { | |
| "total_windows": total_windows, | |
| "selected_windows": selected_windows, | |
| "ratio": len(selected) / max(len(signal), 1), | |
| "threshold": threshold, | |
| "temperature": temperature, | |
| } | |
| if return_windows: | |
| meta["windows"] = windows_meta | |
| return selected, meta | |