File size: 3,383 Bytes
e391a84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
tests/unit/test_sa.py
────────────────────────
Unit tests for the Simulated Annealing utility functions.
"""
from __future__ import annotations

import numpy as np
import pytest

from src.infrastructure.processing.sa_helpers import (
    compute_sample_entropy,
    longest_plateau,
    run_simulated_annealing,
)


def test_longest_plateau() -> None:
    # 1. No plateau
    signal1 = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
    assert longest_plateau(signal1) == 0

    # 2. Small plateau of length 3 (indices 1, 2, 3 have diff < 1e-6)
    signal2 = np.array([1.0, 2.0, 2.0, 2.0, 3.0])
    # diff of signal2 is [1, 0, 0, 1]
    # consecutive zeros = 2 diff values < 1e-6, meaning plateau of 3 values
    assert longest_plateau(signal2) == 2

    # 3. Large plateau
    signal3 = np.array([5.0, 5.0, 5.0, 5.0, 5.0, 5.0])
    # diff is [0, 0, 0, 0, 0] -> longest sequence is 5
    assert longest_plateau(signal3) == 5

    # 4. Short signal
    assert longest_plateau(np.array([1.0])) == 0


def test_compute_sample_entropy() -> None:
    # 1. Zero signal (variance < 1e-8)
    signal1 = np.zeros(100)
    assert compute_sample_entropy(signal1) == 0.0

    # 2. Perfect sine wave (low entropy / clean)
    t = np.linspace(0, 10, 100)
    signal2 = np.sin(t)
    ent2 = compute_sample_entropy(signal2)
    assert ent2 > 0.0

    # 3. Noisy signal (high entropy)
    rng = np.random.default_rng(42)
    signal3 = rng.normal(0, 1.0, 100)
    ent3 = compute_sample_entropy(signal3)
    # Noisy signal should typically have higher sample entropy than sine wave
    assert ent3 > 0.0


def test_run_simulated_annealing() -> None:
    # Generate mock segments
    n_seg = 10
    rng = np.random.default_rng(42)
    
    # 8 clean segments, 2 dirty segments (one noisy, one with flat plateau)
    ppg = np.zeros((n_seg, 224), dtype=np.float32)
    ecg = np.zeros((n_seg, 224), dtype=np.float32)
    
    for i in range(n_seg):
        t = np.linspace(0, 2*np.pi, 224)
        if i == 0:
            # Noise outlier (high entropy)
            ppg[i] = rng.normal(0, 1.0, 224)
            ecg[i] = np.sin(t)
        elif i == 1:
            # Plateau outlier
            ppg[i] = np.sin(t)
            ppg[i, 50:100] = 0.5  # 50 sample plateau
            ecg[i] = np.sin(t)
        else:
            # Normal clean segment
            ppg[i] = np.sin(t) + rng.normal(0, 0.01, 224)
            ecg[i] = np.sin(t*2) + rng.normal(0, 0.01, 224)

    # Segment SBP/DBP predictions
    sbp_preds = np.array([120.0]*n_seg)
    dbp_preds = np.array([80.0]*n_seg)
    # Outliers have very high SBP/DBP
    sbp_preds[0] = 160.0
    sbp_preds[1] = 180.0

    # Run SA (100 steps for fast test run)
    result = run_simulated_annealing(
        ppg_segments=ppg,
        ecg_segments=ecg,
        sbp_preds=sbp_preds,
        dbp_preds=dbp_preds,
        n_steps=100,
        alpha=0.05,
    )

    # Verify return schema
    assert "optimal_lo" in result
    assert "optimal_hi" in result
    assert "optimal_max_plateau" in result
    assert "best_loss" in result
    assert "clean_indices" in result
    assert "history" in result

    # Verify that clean indices filtered out the outliers (index 0 and 1)
    clean_indices = result["clean_indices"]
    assert 0 not in clean_indices
    assert 1 not in clean_indices
    assert len(clean_indices) == 8