| """ |
| infrastructure/processing/scipy_signal_processor.py |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| ScipySignalProcessor β implements SignalProcessor using SciPy. |
| |
| Pipeline (Template Method from SignalProcessor.process()): |
| 1. filter_signal() β Butterworth bandpass (0.5β8 Hz, 4th order) |
| 2. normalize() β Z-score normalisation (mean=0, std=1) |
| 3. segment() β sliding 8-second windows (no overlap) |
| |
| All constants are read from src/shared/constants.py β no magic numbers here. |
| """ |
| from __future__ import annotations |
|
|
| import numpy as np |
| from scipy import signal as scipy_signal |
|
|
| from src.domain.interfaces.services.signal_processor import SignalProcessor |
| from src.shared.constants import ( |
| PPG_BANDPASS_HIGH, |
| PPG_BANDPASS_LOW, |
| PPG_FILTER_ORDER, |
| PPG_SEGMENT_DURATION_SEC, |
| ) |
| from src.shared.logger import get_logger |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| class ScipySignalProcessor(SignalProcessor): |
| """ |
| PPG signal preprocessor using SciPy's digital signal processing tools. |
| |
| Implements the three abstract steps of SignalProcessor: |
| filter_signal β Butterworth bandpass IIR filter |
| normalize β Z-score (subtract mean, divide by std) |
| segment β Fixed-length windows (non-overlapping) |
| """ |
|
|
| |
|
|
| def filter_signal( |
| self, |
| signal: np.ndarray, |
| sampling_rate: float, |
| ) -> np.ndarray: |
| """ |
| Apply a 4th-order Butterworth bandpass filter (0.5β8 Hz). |
| |
| Removes: |
| β’ Baseline wander (< 0.5 Hz β motion artefacts, breathing) |
| β’ High-frequency noise (> 8 Hz β electronics, EMI) |
| |
| Uses ``sosfiltfilt`` (zero-phase, forward-backward) to avoid |
| phase distortion in the filtered signal. |
| |
| Args: |
| signal: 1-D raw PPG amplitude array. |
| sampling_rate: Sampling rate in Hz. |
| |
| Returns: |
| Filtered 1-D NumPy array (same length as input). |
| """ |
| nyquist = sampling_rate / 2.0 |
| low = PPG_BANDPASS_LOW / nyquist |
| high = PPG_BANDPASS_HIGH / nyquist |
|
|
| |
| low = max(1e-4, min(low, 0.999)) |
| high = max(low + 1e-4, min(high, 0.999)) |
|
|
| sos = scipy_signal.butter( |
| N=PPG_FILTER_ORDER, |
| Wn=[low, high], |
| btype="bandpass", |
| output="sos", |
| ) |
| filtered = scipy_signal.sosfiltfilt(sos, signal) |
| logger.debug( |
| "filter_signal() β fs=%.1f Hz, band=[%.2f, %.2f] Hz", |
| sampling_rate, |
| PPG_BANDPASS_LOW, |
| PPG_BANDPASS_HIGH, |
| ) |
| return filtered |
|
|
| |
|
|
| def normalize(self, signal: np.ndarray) -> np.ndarray: |
| """ |
| Z-score normalise the signal to mean=0, std=1. |
| |
| Handles edge cases: |
| β’ All-zero or constant signal β return zeros (avoid division by zero). |
| |
| Args: |
| signal: 1-D filtered PPG signal. |
| |
| Returns: |
| Z-score normalised 1-D array. |
| """ |
| mean = np.mean(signal) |
| std = np.std(signal) |
|
|
| if std < 1e-8: |
| logger.warning( |
| "normalize() β signal std β 0 (constant signal). " |
| "Returning zero array." |
| ) |
| return np.zeros_like(signal) |
|
|
| normalised = (signal - mean) / std |
| logger.debug( |
| "normalize() β mean=%.4f, std=%.4f β Z-score applied", mean, std |
| ) |
| return normalised |
|
|
| |
|
|
| def segment( |
| self, |
| signal: np.ndarray, |
| sampling_rate: float, |
| ) -> np.ndarray: |
| """ |
| Split the signal into non-overlapping windows of PPG_SEGMENT_DURATION_SEC seconds. |
| |
| Any trailing samples that don't fill a complete window are discarded. |
| |
| Args: |
| signal: 1-D normalised PPG signal. |
| sampling_rate: Sampling rate in Hz. |
| |
| Returns: |
| 2-D NumPy array of shape ``(n_segments, window_size)``. |
| Returns shape ``(0, window_size)`` if the signal is too short. |
| """ |
| window_size = int(PPG_SEGMENT_DURATION_SEC * sampling_rate) |
|
|
| if len(signal) < window_size: |
| logger.warning( |
| "segment() β signal length %d < window_size %d. " |
| "Returning empty segments array.", |
| len(signal), |
| window_size, |
| ) |
| return np.empty((0, window_size), dtype=np.float64) |
|
|
| n_segments = len(signal) // window_size |
| trimmed = signal[: n_segments * window_size] |
| segments = trimmed.reshape(n_segments, window_size) |
|
|
| logger.debug( |
| "segment() β %d segments of %d samples (%.1f s each)", |
| n_segments, |
| window_size, |
| PPG_SEGMENT_DURATION_SEC, |
| ) |
| return segments |
|
|