File size: 5,560 Bytes
e391a84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""
infrastructure/processing/scipy_signal_processor.py
─────────────────────────────────────────────────────
ScipySignalProcessor β€” implements SignalProcessor using SciPy.

Pipeline (Template Method from SignalProcessor.process()):
    1. filter_signal()  β€” Butterworth bandpass (0.5–8 Hz, 4th order)
    2. normalize()      β€” Z-score normalisation (mean=0, std=1)
    3. segment()        β€” sliding 8-second windows (no overlap)

All constants are read from src/shared/constants.py β€” no magic numbers here.
"""
from __future__ import annotations

import numpy as np
from scipy import signal as scipy_signal

from src.domain.interfaces.services.signal_processor import SignalProcessor
from src.shared.constants import (
    PPG_BANDPASS_HIGH,
    PPG_BANDPASS_LOW,
    PPG_FILTER_ORDER,
    PPG_SEGMENT_DURATION_SEC,
)
from src.shared.logger import get_logger

logger = get_logger(__name__)


class ScipySignalProcessor(SignalProcessor):
    """
    PPG signal preprocessor using SciPy's digital signal processing tools.

    Implements the three abstract steps of SignalProcessor:
        filter_signal β†’ Butterworth bandpass IIR filter
        normalize     β†’ Z-score (subtract mean, divide by std)
        segment       β†’ Fixed-length windows (non-overlapping)
    """

    # ── Step 1: Filter ────────────────────────────────────────────────────────

    def filter_signal(
        self,
        signal: np.ndarray,
        sampling_rate: float,
    ) -> np.ndarray:
        """
        Apply a 4th-order Butterworth bandpass filter (0.5–8 Hz).

        Removes:
          β€’ Baseline wander (< 0.5 Hz β€” motion artefacts, breathing)
          β€’ High-frequency noise (> 8 Hz β€” electronics, EMI)

        Uses ``sosfiltfilt`` (zero-phase, forward-backward) to avoid
        phase distortion in the filtered signal.

        Args:
            signal:        1-D raw PPG amplitude array.
            sampling_rate: Sampling rate in Hz.

        Returns:
            Filtered 1-D NumPy array (same length as input).
        """
        nyquist = sampling_rate / 2.0
        low = PPG_BANDPASS_LOW / nyquist
        high = PPG_BANDPASS_HIGH / nyquist

        # Clamp to valid range (< 1.0) to avoid ValueError from scipy
        low = max(1e-4, min(low, 0.999))
        high = max(low + 1e-4, min(high, 0.999))

        sos = scipy_signal.butter(
            N=PPG_FILTER_ORDER,
            Wn=[low, high],
            btype="bandpass",
            output="sos",
        )
        filtered = scipy_signal.sosfiltfilt(sos, signal)
        logger.debug(
            "filter_signal() β€” fs=%.1f Hz, band=[%.2f, %.2f] Hz",
            sampling_rate,
            PPG_BANDPASS_LOW,
            PPG_BANDPASS_HIGH,
        )
        return filtered

    # ── Step 2: Normalize ─────────────────────────────────────────────────────

    def normalize(self, signal: np.ndarray) -> np.ndarray:
        """
        Z-score normalise the signal to mean=0, std=1.

        Handles edge cases:
          β€’ All-zero or constant signal β†’ return zeros (avoid division by zero).

        Args:
            signal: 1-D filtered PPG signal.

        Returns:
            Z-score normalised 1-D array.
        """
        mean = np.mean(signal)
        std = np.std(signal)

        if std < 1e-8:
            logger.warning(
                "normalize() β€” signal std β‰ˆ 0 (constant signal). "
                "Returning zero array."
            )
            return np.zeros_like(signal)

        normalised = (signal - mean) / std
        logger.debug(
            "normalize() β€” mean=%.4f, std=%.4f β†’ Z-score applied", mean, std
        )
        return normalised

    # ── Step 3: Segment ───────────────────────────────────────────────────────

    def segment(
        self,
        signal: np.ndarray,
        sampling_rate: float,
    ) -> np.ndarray:
        """
        Split the signal into non-overlapping windows of PPG_SEGMENT_DURATION_SEC seconds.

        Any trailing samples that don't fill a complete window are discarded.

        Args:
            signal:        1-D normalised PPG signal.
            sampling_rate: Sampling rate in Hz.

        Returns:
            2-D NumPy array of shape ``(n_segments, window_size)``.
            Returns shape ``(0, window_size)`` if the signal is too short.
        """
        window_size = int(PPG_SEGMENT_DURATION_SEC * sampling_rate)

        if len(signal) < window_size:
            logger.warning(
                "segment() β€” signal length %d < window_size %d. "
                "Returning empty segments array.",
                len(signal),
                window_size,
            )
            return np.empty((0, window_size), dtype=np.float64)

        n_segments = len(signal) // window_size
        trimmed = signal[: n_segments * window_size]
        segments = trimmed.reshape(n_segments, window_size)

        logger.debug(
            "segment() β€” %d segments of %d samples (%.1f s each)",
            n_segments,
            window_size,
            PPG_SEGMENT_DURATION_SEC,
        )
        return segments