""" infrastructure/processing/scipy_signal_processor.py ───────────────────────────────────────────────────── ScipySignalProcessor — implements SignalProcessor using SciPy. Pipeline (Template Method from SignalProcessor.process()): 1. filter_signal() — Butterworth bandpass (0.5–8 Hz, 4th order) 2. normalize() — Z-score normalisation (mean=0, std=1) 3. segment() — sliding 8-second windows (no overlap) All constants are read from src/shared/constants.py — no magic numbers here. """ from __future__ import annotations import numpy as np from scipy import signal as scipy_signal from src.domain.interfaces.services.signal_processor import SignalProcessor from src.shared.constants import ( PPG_BANDPASS_HIGH, PPG_BANDPASS_LOW, PPG_FILTER_ORDER, PPG_SEGMENT_DURATION_SEC, ) from src.shared.logger import get_logger logger = get_logger(__name__) class ScipySignalProcessor(SignalProcessor): """ PPG signal preprocessor using SciPy's digital signal processing tools. Implements the three abstract steps of SignalProcessor: filter_signal → Butterworth bandpass IIR filter normalize → Z-score (subtract mean, divide by std) segment → Fixed-length windows (non-overlapping) """ # ── Step 1: Filter ──────────────────────────────────────────────────────── def filter_signal( self, signal: np.ndarray, sampling_rate: float, ) -> np.ndarray: """ Apply a 4th-order Butterworth bandpass filter (0.5–8 Hz). Removes: • Baseline wander (< 0.5 Hz — motion artefacts, breathing) • High-frequency noise (> 8 Hz — electronics, EMI) Uses ``sosfiltfilt`` (zero-phase, forward-backward) to avoid phase distortion in the filtered signal. Args: signal: 1-D raw PPG amplitude array. sampling_rate: Sampling rate in Hz. Returns: Filtered 1-D NumPy array (same length as input). """ nyquist = sampling_rate / 2.0 low = PPG_BANDPASS_LOW / nyquist high = PPG_BANDPASS_HIGH / nyquist # Clamp to valid range (< 1.0) to avoid ValueError from scipy low = max(1e-4, min(low, 0.999)) high = max(low + 1e-4, min(high, 0.999)) sos = scipy_signal.butter( N=PPG_FILTER_ORDER, Wn=[low, high], btype="bandpass", output="sos", ) filtered = scipy_signal.sosfiltfilt(sos, signal) logger.debug( "filter_signal() — fs=%.1f Hz, band=[%.2f, %.2f] Hz", sampling_rate, PPG_BANDPASS_LOW, PPG_BANDPASS_HIGH, ) return filtered # ── Step 2: Normalize ───────────────────────────────────────────────────── def normalize(self, signal: np.ndarray) -> np.ndarray: """ Z-score normalise the signal to mean=0, std=1. Handles edge cases: • All-zero or constant signal → return zeros (avoid division by zero). Args: signal: 1-D filtered PPG signal. Returns: Z-score normalised 1-D array. """ mean = np.mean(signal) std = np.std(signal) if std < 1e-8: logger.warning( "normalize() — signal std ≈ 0 (constant signal). " "Returning zero array." ) return np.zeros_like(signal) normalised = (signal - mean) / std logger.debug( "normalize() — mean=%.4f, std=%.4f → Z-score applied", mean, std ) return normalised # ── Step 3: Segment ─────────────────────────────────────────────────────── def segment( self, signal: np.ndarray, sampling_rate: float, ) -> np.ndarray: """ Split the signal into non-overlapping windows of PPG_SEGMENT_DURATION_SEC seconds. Any trailing samples that don't fill a complete window are discarded. Args: signal: 1-D normalised PPG signal. sampling_rate: Sampling rate in Hz. Returns: 2-D NumPy array of shape ``(n_segments, window_size)``. Returns shape ``(0, window_size)`` if the signal is too short. """ window_size = int(PPG_SEGMENT_DURATION_SEC * sampling_rate) if len(signal) < window_size: logger.warning( "segment() — signal length %d < window_size %d. " "Returning empty segments array.", len(signal), window_size, ) return np.empty((0, window_size), dtype=np.float64) n_segments = len(signal) // window_size trimmed = signal[: n_segments * window_size] segments = trimmed.reshape(n_segments, window_size) logger.debug( "segment() — %d segments of %d samples (%.1f s each)", n_segments, window_size, PPG_SEGMENT_DURATION_SEC, ) return segments