File size: 3,738 Bytes
5ec9e9d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from typing import Any, Dict, List, Tuple

from sundew.gating import gate_probability_with_hysteresis, significance_score


def _clamp(value: float, low: float = 0.0, high: float = 1.0) -> float:
    return max(low, min(high, value))


def _window_features(window: List[float]) -> Dict[str, float]:
    """
    Compute basic features for a window to feed Sundew's significance score.
    """
    if not window:
        return {"magnitude": 0.0, "anomaly_score": 0.0, "context_relevance": 0.0, "urgency": 0.0}

    length = float(len(window))
    mean = sum(window) / length
    mean_abs = sum(abs(x) for x in window) / length
    max_abs = max(abs(x) for x in window)
    variance = sum((x - mean) ** 2 for x in window) / length

    magnitude = _clamp(max_abs * 10.0, 0.0, 10.0) * 10.0  # 0..100 scale
    anomaly_score = _clamp(variance / (variance + 1.0))
    context_relevance = _clamp(mean_abs)
    urgency = _clamp(mean_abs * 0.5)

    return {
        "magnitude": magnitude,
        "anomaly_score": anomaly_score,
        "context_relevance": context_relevance,
        "urgency": urgency,
    }


def gate_signal(
    signal: List[float],
    window_size: int = 128,
    step: int = 64,
    threshold: float = 0.55,
    temperature: float = 0.15,
    return_windows: bool = False,
    max_windows: int = 200,
) -> Tuple[List[float], Dict[str, Any]]:
    """
    Apply Sundew gating over a sliding window to reduce workload.

    Args:
        signal: raw signal values
        window_size: sliding window size
        step: stride between windows
        threshold: gate threshold
        temperature: gate temperature (0=hard)
        return_windows: include per-window metadata
        max_windows: limit number of windows returned (for previews)

    Returns:
        gated_signal: flattened list of selected windows (original signal if nothing selected)
        meta: gating metadata (counts, ratios, thresholds, optional windows)
    """
    if len(signal) < window_size:
        meta = {
            "total_windows": 0,
            "selected_windows": 0,
            "ratio": 1.0,
            "threshold": threshold,
            "temperature": temperature,
        }
        return signal, meta

    last_activation = False
    selected: List[float] = []
    total_windows = 0
    selected_windows = 0
    windows_meta: List[Dict[str, Any]] = []

    for start in range(0, len(signal) - window_size + 1, step):
        window = signal[start : start + window_size]
        total_windows += 1

        features = _window_features(window)
        sig = significance_score(features, w_mag=0.35, w_ano=0.4, w_ctx=0.15, w_urg=0.1)
        prob = gate_probability_with_hysteresis(sig, threshold=threshold, temperature=temperature, last_activation=last_activation)

        chosen = prob >= 0.5
        if chosen:
            selected.extend(window)
            selected_windows += 1
            last_activation = True
        else:
            last_activation = False

        if return_windows and len(windows_meta) < max_windows:
            windows_meta.append(
                {
                    "start": start,
                    "end": start + window_size,
                    "significance": sig,
                    "probability": prob,
                    "selected": chosen,
                }
            )

    if not selected:
        selected = signal  # fall back to full signal

    meta: Dict[str, Any] = {
        "total_windows": total_windows,
        "selected_windows": selected_windows,
        "ratio": len(selected) / max(len(signal), 1),
        "threshold": threshold,
        "temperature": temperature,
    }
    if return_windows:
        meta["windows"] = windows_meta
    return selected, meta